Bladeren bron

re-add redis support, enabled by default

Mike L 2 jaren geleden
bovenliggende
commit
8b7274a063
18 gewijzigde bestanden met toevoegingen van 257 en 23 verwijderingen
  1. 5 1
      config.example.json
  2. 8 1
      dist/main.js
  3. 0 0
      dist/main.js.map
  4. 10 0
      dist/redis.d.ts
  5. 48 0
      dist/redis.js
  6. 0 0
      dist/redis.js.map
  7. 3 1
      dist/twitter.d.ts
  8. 43 6
      dist/twitter.js
  9. 0 0
      dist/twitter.js.map
  10. 1 1
      dist/webshot.d.ts
  11. 2 1
      dist/webshot.js
  12. 0 0
      dist/webshot.js.map
  13. 1 0
      package.json
  14. 7 1
      src/main.ts
  15. 6 0
      src/model.d.ts
  16. 69 0
      src/redis.ts
  17. 51 9
      src/twitter.ts
  18. 3 2
      src/webshot.ts

+ 5 - 1
config.example.json

@@ -13,5 +13,9 @@
   "work_interval": 60,
   "webshot_delay": 10000,
   "lockfile": "subscriber.lock",
-  "loglevel": "info"
+  "loglevel": "info",
+  "redis": true,
+  "redis_host": "127.0.0.1",
+  "redis_port": 6379,
+  "redis_expire_time": 43200
 }

+ 8 - 1
dist/main.js

@@ -1,5 +1,6 @@
 #!/usr/bin/env node
 "use strict";
+var _a;
 Object.defineProperty(exports, "__esModule", { value: true });
 const fs = require("fs");
 const path = require("path");
@@ -52,9 +53,10 @@ const requiredFields = [
 ];
 const warningFields = [
     'cq_ws_host', 'cq_ws_port', 'cq_access_token',
+    ...((_a = config.redis) !== null && _a !== void 0 ? _a : exampleConfig.redis) ? ['redis_host', 'redis_port', 'redis_expire_time'] : [],
 ];
 const optionalFields = [
-    'lockfile', 'work_interval', 'webshot_delay', 'loglevel', 'mode', 'resume_on_start',
+    'lockfile', 'work_interval', 'webshot_delay', 'loglevel', 'mode', 'resume_on_start', 'redis',
 ].concat(warningFields);
 if (requiredFields.some((value) => config[value] === undefined)) {
     console.log(`${requiredFields.join(', ')} are required`);
@@ -128,6 +130,11 @@ const worker = new twitter_1.default({
     webshotDelay: config.webshot_delay,
     mode: config.mode,
     wsUrl: config.playwright_ws_spec_endpoint,
+    redis: !config.redis ? undefined : {
+        redisHost: config.redis_host,
+        redisPort: config.redis_port,
+        redisExpireTime: config.redis_expire_time,
+    },
 });
 worker.launch();
 qq.connect();

File diff suppressed because it is too large
+ 0 - 0
dist/main.js.map


+ 10 - 0
dist/redis.d.ts

@@ -0,0 +1,10 @@
+export default class {
+    private client;
+    private expireAfter;
+    constructor(opt: IRedisConfig);
+    private chatAsString;
+    cacheContent: (contentId: string, content: string) => Promise<void>;
+    cacheForChat: (postId: string, target: IChat) => Promise<void>;
+    getContent: (contentId: string) => Promise<string>;
+    isCachedForChat: (postId: string, target: IChat) => Promise<boolean>;
+}

+ 48 - 0
dist/redis.js

@@ -0,0 +1,48 @@
+"use strict";
+Object.defineProperty(exports, "__esModule", { value: true });
+const redis = require("redis");
+const loggers_1 = require("./loggers");
+const logger = loggers_1.getLogger('redis');
+class default_1 {
+    constructor(opt) {
+        this.chatAsString = (chat) => `${chat.chatType}:${chat.chatID.toString()}`;
+        this.cacheContent = (contentId, content) => new Promise((resolve, reject) => this.client.set(`content/${contentId}`, content, 'EX', 3600 * 24, (err, res) => err ? reject(err) : resolve(res))).then(res => {
+            logger.debug(`cached content ${contentId}, result: ${res}`);
+        }).catch((err) => {
+            logger.error(`failed to cache content ${contentId}, error: ${err}`);
+        });
+        this.cacheForChat = (postId, target) => {
+            const targetStr = this.chatAsString(target);
+            return new Promise((resolve, reject) => this.client.set(`sent/${targetStr}/${postId}`, 'true', 'EX', this.expireAfter, (err, res) => err ? reject(err) : resolve(res))).then(res => {
+                logger.debug(`cached post ${postId} for ${targetStr}, result: ${res}`);
+            }).catch((err) => {
+                logger.error(`failed to cache post ${postId} for ${targetStr}, error: ${err}`);
+            });
+        };
+        this.getContent = (contentId) => new Promise((resolve, reject) => this.client.get(`content/${contentId}`, (err, res) => err ? reject(err) : resolve(res))).then(res => {
+            logger.debug(`retrieved cached content ${contentId}, result: ${res}`);
+            return res;
+        }).catch((err) => {
+            logger.error(`failed to retrieve cached content ${contentId}, error: ${err}`);
+            throw err;
+        });
+        this.isCachedForChat = (postId, target) => {
+            const targetStr = this.chatAsString(target);
+            return new Promise((resolve, reject) => this.client.exists(`sent/${targetStr}/${postId}`, (err, res) => err ? reject(err) : resolve(res))).then(res => {
+                logger.debug(`retrieved status of post ${postId} for ${targetStr}, result: ${res}`);
+                return Boolean(res);
+            }).catch((err) => {
+                logger.error(`failed to retrieve status of post ${postId} for ${targetStr}, error: ${err}`);
+                return false;
+            });
+        };
+        this.client = redis.createClient({
+            host: opt.redisHost,
+            port: opt.redisPort,
+        });
+        this.expireAfter = opt.redisExpireTime;
+        logger.info(`loaded redis service at ${opt.redisHost}:${opt.redisPort}`);
+    }
+}
+exports.default = default_1;
+//# sourceMappingURL=redis.js.map

File diff suppressed because it is too large
+ 0 - 0
dist/redis.js.map


+ 3 - 1
dist/twitter.d.ts

@@ -12,6 +12,7 @@ interface IWorkerOption {
     accessTokenSecret: string;
     mode: number;
     wsUrl: string;
+    redis?: IRedisConfig;
 }
 export declare class ScreenNameNormalizer {
     static _queryUser: (username: string) => Promise<string>;
@@ -55,13 +56,14 @@ export default class {
     private webshot;
     private mode;
     private wsUrl;
+    private redis;
     constructor(opt: IWorkerOption);
     launch: () => void;
     queryUser: (username: string) => Promise<string>;
     queryTimelineReverse: (conf: ITimelineQueryConfig) => Promise<ITweet[]>;
     queryTimeline: ({ username, count, since, until, noreps, norts }: ITimelineQueryConfig) => Promise<ITweet[]>;
     private workOnTweets;
-    getTweet: (id: string, sender: (msg: string, text: string, author: string) => void) => Promise<void>;
+    getTweet: (id: string, sender: (id: string, msg: string, text: string, author: string) => void) => Promise<void[]>;
     private sendTweets;
     work: () => void;
 }

+ 43 - 6
dist/twitter.js

@@ -14,6 +14,7 @@ const fs = require("fs");
 const path = require("path");
 const Twitter = require("twitter");
 const loggers_1 = require("./loggers");
+const redis_1 = require("./redis");
 const utils_1 = require("./utils");
 const webshot_1 = require("./webshot");
 class ScreenNameNormalizer {
@@ -126,7 +127,23 @@ class default_1 {
             });
             return fetchTimeline();
         };
-        this.workOnTweets = (tweets, sendTweets) => this.webshot(tweets, sendTweets, this.webshotDelay);
+        this.workOnTweets = (tweets, sendTweets) => Promise.all(tweets.map(tweet => (this.redis ? this.redis.getContent(`webshot/${tweet.id_str}`) : Promise.reject())
+            .then(content => {
+            if (content === null)
+                throw Error();
+            logger.info(`retrieved cached webshot of tweet ${tweet.id_str} from redis database`);
+            const { msg, text, author } = JSON.parse(content);
+            sendTweets(tweet.retweeted_status ? tweet.retweeted_status.id_str : tweet.id_str, msg, text, author);
+        }).catch(() => this.webshot([tweet], (id, msg, text, author) => {
+            Promise.resolve()
+                .then(() => {
+                if (!this.redis)
+                    return;
+                logger.info(`caching webshot of tweet ${tweet.id_str} to redis database`);
+                this.redis.cacheContent(`webshot/${tweet.id_str}`, JSON.stringify({ msg, text, author }));
+            })
+                .then(() => sendTweets(id, msg, text, author));
+        }, this.webshotDelay))));
         this.getTweet = (id, sender) => {
             const endpoint = 'statuses/show';
             const config = {
@@ -139,10 +156,11 @@ class default_1 {
                 return this.workOnTweets([tweet], sender);
             });
         };
-        this.sendTweets = (source, ...to) => (msg, text, author) => {
+        this.sendTweets = (config = { reportOnSkip: false }, ...to) => (id, msg, text, author) => {
             to.forEach(subscriber => {
-                logger.info(`pushing data${source ? ` of ${source}` : ''} to ${JSON.stringify(subscriber)}`);
-                retryOnError(() => this.bot.sendTo(subscriber, msg), (_, count, terminate) => {
+                const { sourceInfo: source, reportOnSkip } = config;
+                const targetStr = JSON.stringify(subscriber);
+                const send = () => retryOnError(() => this.bot.sendTo(subscriber, msg), (_, count, terminate) => {
                     if (count <= maxTrials) {
                         logger.warn(`retry sending to ${subscriber.chatID} for the ${ordinal(count)} time...`);
                     }
@@ -150,6 +168,23 @@ class default_1 {
                         logger.warn(`${count - 1} consecutive failures while sending message chain, trying plain text instead...`);
                         terminate(this.bot.sendTo(subscriber, author + text, true));
                     }
+                }).then(() => {
+                    if (this.redis) {
+                        logger.info(`caching push status of this tweet (or its origin in case of a retweet) for ${targetStr}...`);
+                        return this.redis.cacheForChat(id, subscriber);
+                    }
+                });
+                (this.redis ? this.redis.isCachedForChat(id, subscriber) : Promise.resolve(false))
+                    .then(isCached => {
+                    if (isCached) {
+                        logger.info(`skipped subscriber ${targetStr} as this tweet or the origin of this retweet has been sent already`);
+                        if (!reportOnSkip)
+                            return;
+                        text = `[最近发送过的推文:${id}]`;
+                        msg = author + text;
+                    }
+                    logger.info(`pushing data${source ? ` of ${source}` : ''} to ${targetStr}`);
+                    return send();
                 });
             });
         };
@@ -247,7 +282,7 @@ class default_1 {
                 }
                 if (currentThread.offset === '0')
                     tweets.splice(1);
-                return this.workOnTweets(tweets, this.sendTweets(`thread ${currentFeed}`, ...currentThread.subscribers))
+                return this.workOnTweets(tweets, this.sendTweets({ sourceInfo: `thread ${currentFeed}` }, ...currentThread.subscribers))
                     .then(updateDate).then(updateOffset);
             })
                 .then(() => {
@@ -274,9 +309,11 @@ class default_1 {
         this.webshotDelay = opt.webshotDelay;
         this.mode = opt.mode;
         this.wsUrl = opt.wsUrl;
+        if (opt.redis)
+            this.redis = new redis_1.default(opt.redis);
         ScreenNameNormalizer._queryUser = this.queryUser;
         exports.sendTweet = (id, receiver) => {
-            this.getTweet(id, this.sendTweets(`tweet ${id}`, receiver))
+            this.getTweet(id, this.sendTweets({ sourceInfo: `tweet ${id}`, reportOnSkip: true }, receiver))
                 .catch((err) => {
                 if (err[0].code !== 144) {
                     logger.warn(`error retrieving tweet: ${err[0].message}`);

File diff suppressed because it is too large
+ 0 - 0
dist/twitter.js.map


+ 1 - 1
dist/webshot.d.ts

@@ -10,6 +10,6 @@ declare class Webshot extends CallableInstance<[Tweets, (...args: any[]) => void
     private extendEntity;
     private renderWebshot;
     private fetchMedia;
-    webshot(tweets: Tweets, callback: (msgs: string, text: string, author: string) => void, webshotDelay: number): Promise<void>;
+    webshot(tweets: Tweets, callback: (twiId: string, msgs: string, text: string, author: string) => void, webshotDelay: number): Promise<void>;
 }
 export default Webshot;

+ 2 - 1
dist/webshot.js

@@ -352,7 +352,8 @@ class Webshot extends CallableInstance {
             promise.then(() => {
                 logger.info(`done working on ${twi.user.screen_name}/${twi.id_str}, message chain:`);
                 logger.info(JSON.stringify(koishi_1.Message.ellipseBase64(messageChain)));
-                callback(messageChain, xmlEntities.decode(text), author);
+                const twiId = twi.retweeted_status ? twi.retweeted_status.id_str : twi.id_str;
+                callback(twiId, messageChain, xmlEntities.decode(text), author);
             });
         });
         return promise;

File diff suppressed because it is too large
+ 0 - 0
dist/webshot.js.map


+ 1 - 0
package.json

@@ -38,6 +38,7 @@
     "playwright": "^1.11.0",
     "pngjs": "^6.0.0",
     "read-all-stream": "^3.1.0",
+    "redis": "^3.1.2",
     "sha1": "^1.1.1",
     "sharp": "^0.26.3",
     "temp": "^0.9.4",

+ 7 - 1
src/main.ts

@@ -62,10 +62,11 @@ const requiredFields = [
 
 const warningFields = [
   'cq_ws_host', 'cq_ws_port', 'cq_access_token',
+  ...(config.redis ?? exampleConfig.redis) ? ['redis_host', 'redis_port', 'redis_expire_time'] : [],
 ];
 
 const optionalFields = [
-  'lockfile', 'work_interval', 'webshot_delay', 'loglevel', 'mode', 'resume_on_start',
+  'lockfile', 'work_interval', 'webshot_delay', 'loglevel', 'mode', 'resume_on_start', 'redis',
 ].concat(warningFields);
 
 if (requiredFields.some((value) => config[value] === undefined)) {
@@ -142,6 +143,11 @@ const worker = new Worker({
   webshotDelay: config.webshot_delay,
   mode: config.mode,
   wsUrl: config.playwright_ws_spec_endpoint,
+  redis: !config.redis ? undefined : {
+    redisHost: config.redis_host,
+    redisPort: config.redis_port,
+    redisExpireTime: config.redis_expire_time,
+  },
 });
 worker.launch();
 

+ 6 - 0
src/model.d.ts

@@ -33,3 +33,9 @@ interface ILock {
     },
   };
 }
+
+interface IRedisConfig {
+  redisHost: string;
+  redisPort: number;
+  redisExpireTime: number;
+}

+ 69 - 0
src/redis.ts

@@ -0,0 +1,69 @@
+import * as redis from 'redis';
+import { getLogger } from './loggers';
+
+const logger = getLogger('redis');
+
+export default class {
+
+  private client: redis.RedisClient;
+  private expireAfter: number;
+
+  constructor(opt: IRedisConfig) {
+    this.client = redis.createClient({
+      host: opt.redisHost,
+      port: opt.redisPort,
+    });
+    this.expireAfter = opt.redisExpireTime;
+    logger.info(`loaded redis service at ${opt.redisHost}:${opt.redisPort}`);
+  }
+
+  private chatAsString = (chat: IChat) => `${chat.chatType}:${chat.chatID.toString()}`;
+
+  public cacheContent = (contentId: string, content: string) =>
+    new Promise<'OK'>((resolve, reject) =>
+      this.client.set(`content/${contentId}`, content, 'EX', 3600 * 24, (err, res) =>
+        err ? reject(err) : resolve(res)
+      )
+    ).then(res => {
+      logger.debug(`cached content ${contentId}, result: ${res}`);
+    }).catch((err: Error) => {
+      logger.error(`failed to cache content ${contentId}, error: ${err}`);
+    });
+
+  public cacheForChat = (postId: string, target: IChat) => {
+    const targetStr = this.chatAsString(target);
+    return new Promise<'OK'>((resolve, reject) =>
+      this.client.set(`sent/${targetStr}/${postId}`, 'true', 'EX', this.expireAfter, (err, res) =>
+        err ? reject(err) : resolve(res)
+      )
+    ).then(res => {
+      logger.debug(`cached post ${postId} for ${targetStr}, result: ${res}`);
+    }).catch((err: Error) => {
+      logger.error(`failed to cache post ${postId} for ${targetStr}, error: ${err}`);
+    });
+  };
+
+  public getContent = (contentId: string) =>
+    new Promise<string>((resolve, reject) =>
+      this.client.get(`content/${contentId}`, (err, res) => err ? reject(err) : resolve(res))
+    ).then(res => {
+      logger.debug(`retrieved cached content ${contentId}, result: ${res}`);
+      return res;
+    }).catch((err: Error) => {
+      logger.error(`failed to retrieve cached content ${contentId}, error: ${err}`);
+      throw err;
+    });
+
+  public isCachedForChat = (postId: string, target: IChat) => {
+    const targetStr = this.chatAsString(target);
+    return new Promise<number>((resolve, reject) =>
+      this.client.exists(`sent/${targetStr}/${postId}`, (err, res) => err ? reject(err) : resolve(res))
+    ).then(res => {
+      logger.debug(`retrieved status of post ${postId} for ${targetStr}, result: ${res}`);
+      return Boolean(res);
+    }).catch((err: Error) => {
+      logger.error(`failed to retrieve status of post ${postId} for ${targetStr}, error: ${err}`);
+      return false;
+    });
+  };
+}

+ 51 - 9
src/twitter.ts

@@ -5,6 +5,7 @@ import TwitterTypes from 'twitter-d';
 
 import { getLogger } from './loggers';
 import QQBot from './koishi';
+import RedisSvc from './redis';
 import { chainPromises, BigNumOps } from './utils';
 import Webshot from './webshot';
 
@@ -20,6 +21,7 @@ interface IWorkerOption {
   accessTokenSecret: string;
   mode: number;
   wsUrl: string;
+  redis?: IRedisConfig;
 }
 
 export class ScreenNameNormalizer {
@@ -121,6 +123,7 @@ export default class {
   private webshot: Webshot;
   private mode: number;
   private wsUrl: string;
+  private redis: RedisSvc;
 
   constructor(opt: IWorkerOption) {
     this.client = new Twitter({
@@ -136,9 +139,10 @@ export default class {
     this.webshotDelay = opt.webshotDelay;
     this.mode = opt.mode;
     this.wsUrl = opt.wsUrl;
+    if (opt.redis) this.redis = new RedisSvc(opt.redis);
     ScreenNameNormalizer._queryUser = this.queryUser;
     sendTweet = (id, receiver) => {
-      this.getTweet(id, this.sendTweets(`tweet ${id}`, receiver))
+      this.getTweet(id, this.sendTweets({sourceInfo: `tweet ${id}`, reportOnSkip: true}, receiver))
         .catch((err: {code: number, message: string}[]) => {
           if (err[0].code !== 144) {
             logger.warn(`error retrieving tweet: ${err[0].message}`);
@@ -258,10 +262,28 @@ export default class {
 
   private workOnTweets = (
     tweets: Tweets,
-    sendTweets: (msg: string, text: string, author: string) => void
-  ) => this.webshot(tweets, sendTweets, this.webshotDelay);
-
-  public getTweet = (id: string, sender: (msg: string, text: string, author: string) => void) => {
+    sendTweets: (id: string, msg: string, text: string, author: string) => void
+  ) => Promise.all(tweets.map(tweet => 
+    (this.redis ? this.redis.getContent(`webshot/${tweet.id_str}`) : Promise.reject())
+      .then(content => {
+        if (content === null) throw Error();
+        logger.info(`retrieved cached webshot of tweet ${tweet.id_str} from redis database`);
+        const {msg, text, author} = JSON.parse(content) as {[key: string]: string};
+        sendTweets(tweet.retweeted_status ? tweet.retweeted_status.id_str : tweet.id_str, msg, text, author);
+      }).catch(() =>
+        this.webshot([tweet], (id: string, msg: string, text: string, author: string) => {
+          Promise.resolve()
+            .then(() => {
+              if (!this.redis) return;
+              logger.info(`caching webshot of tweet ${tweet.id_str} to redis database`);
+              this.redis.cacheContent(`webshot/${tweet.id_str}`, JSON.stringify({msg, text, author}));
+            })
+            .then(() => sendTweets(id, msg, text, author));
+        }, this.webshotDelay)
+      )
+  ));
+
+  public getTweet = (id: string, sender: (id: string, msg: string, text: string, author: string) => void) => {
     const endpoint = 'statuses/show';
     const config = {
       id,
@@ -274,10 +296,13 @@ export default class {
       });
   };
 
-  private sendTweets = (source?: string, ...to: IChat[]) => (msg: string, text: string, author: string) => {
+  private sendTweets = (
+    config: {sourceInfo?: string, reportOnSkip?: boolean} = {reportOnSkip: false}, ...to: IChat[]
+  ) => (id: string, msg: string, text: string, author: string) => {
     to.forEach(subscriber => {
-      logger.info(`pushing data${source ? ` of ${source}` : ''} to ${JSON.stringify(subscriber)}`);
-      retryOnError(
+      const {sourceInfo: source, reportOnSkip} = config;
+      const targetStr = JSON.stringify(subscriber);
+      const send = () => retryOnError(
         () => this.bot.sendTo(subscriber, msg),
         (_, count, terminate: (doNothing: Promise<void>) => void) => {
           if (count <= maxTrials) {
@@ -286,6 +311,23 @@ export default class {
             logger.warn(`${count - 1} consecutive failures while sending message chain, trying plain text instead...`);
             terminate(this.bot.sendTo(subscriber, author + text, true));
           }
+        }
+      ).then(() => {
+        if (this.redis) {
+          logger.info(`caching push status of this tweet (or its origin in case of a retweet) for ${targetStr}...`);
+          return this.redis.cacheForChat(id, subscriber);
+        }
+      });
+      (this.redis ? this.redis.isCachedForChat(id, subscriber) : Promise.resolve(false))
+        .then(isCached => {
+          if (isCached) {
+            logger.info(`skipped subscriber ${targetStr} as this tweet or the origin of this retweet has been sent already`);
+            if (!reportOnSkip) return;
+            text = `[最近发送过的推文:${id}]`;
+            msg = author + text;
+          }
+          logger.info(`pushing data${source ? ` of ${source}` : ''} to ${targetStr}`);
+          return send();
         });
     });
   };
@@ -377,7 +419,7 @@ export default class {
       if (currentThread.offset === '-1') { updateOffset(); return; }
       if (currentThread.offset === '0') tweets.splice(1);
 
-      return this.workOnTweets(tweets, this.sendTweets(`thread ${currentFeed}`, ...currentThread.subscribers))
+      return this.workOnTweets(tweets, this.sendTweets({sourceInfo: `thread ${currentFeed}`}, ...currentThread.subscribers))
         .then(updateDate).then(updateOffset);
     })
       .then(() => {

+ 3 - 2
src/webshot.ts

@@ -280,7 +280,7 @@ class Webshot extends CallableInstance<[Tweets, (...args) => void, number], Prom
 
   public webshot(
     tweets: Tweets,
-    callback: (msgs: string, text: string, author: string) => void,
+    callback: (twiId: string, msgs: string, text: string, author: string) => void,
     webshotDelay: number
   ): Promise<void> {
     let promise = new Promise<void>(resolve => {
@@ -383,7 +383,8 @@ class Webshot extends CallableInstance<[Tweets, (...args) => void, number], Prom
       promise.then(() => {
         logger.info(`done working on ${twi.user.screen_name}/${twi.id_str}, message chain:`);
         logger.info(JSON.stringify(Message.ellipseBase64(messageChain)));
-        callback(messageChain, xmlEntities.decode(text), author);
+        const twiId = twi.retweeted_status ? twi.retweeted_status.id_str : twi.id_str;
+        callback(twiId, messageChain, xmlEntities.decode(text), author);
       });
     });
     return promise;

Some files were not shown because too many files changed in this diff