Browse Source

Merge branch 'koishi-redis-waiting' into mediaonly-koishi-redis-waiting

Mike L 3 năm trước cách đây
mục cha
commit
91d1801db6
7 tập tin đã thay đổi với 425 bổ sung92 xóa
  1. 138 2
      dist/redis.js
  2. 43 15
      dist/twitter.js
  3. 7 4
      dist/webshot.js
  4. 2 2
      src/command.ts
  5. 138 3
      src/redis.ts
  6. 87 58
      src/twitter.ts
  7. 10 8
      src/webshot.ts

+ 138 - 2
dist/redis.js

@@ -1,6 +1,6 @@
 "use strict";
 Object.defineProperty(exports, "__esModule", { value: true });
-const redis = require("redis");
+const redis_1 = require("redis");
 const loggers_1 = require("./loggers");
 const logger = loggers_1.getLogger('redis');
 class default_1 {
@@ -26,6 +26,87 @@ class default_1 {
             logger.error(`failed to retrieve cached content ${contentId}, error: ${err}`);
             throw err;
         });
+        this.startProcess = (processId) => {
+            this.client.publish(`twitter:locks`, `WIP:${processId}`, (err, res) => {
+                if (err) {
+                    return logger.error(`error notifying subscription client about WIP:${processId}, result: ${res}`);
+                }
+                logger.debug(`notified subscription client to start tracking ${processId}, result: ${res}`);
+            });
+            this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
+                if (err)
+                    logger.error(`failed to subscribe to own process lock, error: ${err}`);
+            });
+        };
+        this.finishProcess = (processId) => {
+            this.client.publish(`twitter:lock/${processId}`, 'DONE');
+            this.subscriber.unsubscribe(`twitter:lock/${processId}`, err => {
+                if (err)
+                    return logger.error(`failed to unsubscribe from own process lock, error: ${err}`);
+                logger.info(`successfully unsubscribed from process lock ${processId}`);
+            });
+        };
+        this.waitForProcess = (processId, timeout) => {
+            if (!(processId in this.subscriptions)) {
+                logger.debug(`creating new waiting function for ${processId}...`);
+                let timeoutHandle;
+                this.subscriptions[processId] = new Promise((resolve, reject) => {
+                    this.subscriber.on('message', (channel, message) => {
+                        logger.debug(`received status notification from channel ${processId}, status: ${message}`);
+                        if (channel === `twitter:lock/${processId}`) {
+                            if (message === 'DONE')
+                                return resolve();
+                            if (message === 'BREAK')
+                                return reject();
+                        }
+                        if (channel === `twitter:ping/${processId}`) {
+                            this.subscriber.unsubscribe(`twitter:ping/${processId}`, err => {
+                                if (err) {
+                                    return logger.error(`failed to unsubscribed from reply channel for pinging ${processId}, error: ${err}`);
+                                }
+                                logger.debug(`successfully unsubscribed from reply channel for pinging ${processId}`);
+                            });
+                            if (message === 'WIP') {
+                                this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
+                                    if (err)
+                                        logger.error(`failed to subscribe to status channel of process ${processId}, error: ${err}`);
+                                    logger.debug(`successfully subscribed to status channel of process ${processId}`);
+                                });
+                            }
+                            if (message === 'NONE')
+                                return resolve();
+                        }
+                    });
+                    timeoutHandle = setTimeout(() => {
+                        this.client.publish(`twitter:lock/${processId}`, 'BREAK', err => {
+                            if (err) {
+                                logger.error(`failed while calling to remove process lock ${processId}, error: ${err}`);
+                            }
+                            else {
+                                logger.warn(`timed out waiting for process ${processId}, give up waiting`);
+                            }
+                        });
+                    }, timeout);
+                    this.subscriber.subscribe(`twitter:ping/${processId}`, err => {
+                        if (err) {
+                            logger.error(`failed to subscribe to reply channel for pinging ${processId}, error: ${err}`);
+                            reject();
+                        }
+                    });
+                    this.client.publish(`twitter:locks`, `PING:${processId}`, (err, res) => {
+                        if (err) {
+                            return logger.error(`error pinging process ${processId} via subscription client, result: ${res}`);
+                        }
+                        logger.debug(`pinged process ${processId} via subscription client, result: ${res}`);
+                    });
+                }).finally(() => {
+                    clearTimeout(timeoutHandle);
+                    logger.debug(`deleting waiting function for ${processId}...`);
+                    delete this.subscriptions[processId];
+                });
+            }
+            return this.subscriptions[processId];
+        };
         this.isCachedForChat = (postId, target) => {
             const targetStr = this.chatAsString(target);
             return new Promise((resolve, reject) => this.client.exists(`sent/${targetStr}/${postId}`, (err, res) => err ? reject(err) : resolve(res))).then(res => {
@@ -36,10 +117,65 @@ class default_1 {
                 return false;
             });
         };
-        this.client = redis.createClient({
+        this.client = redis_1.createClient({
             host: opt.redisHost,
             port: opt.redisPort,
         });
+        this.subscriber = this.client.duplicate();
+        this.client.pubsub('NUMSUB', 'twitter:locks', (err, reply) => {
+            if (err) {
+                logger.fatal(`failed to query subscribers of global lock registry, error: ${err}`);
+                process.exit(1);
+            }
+            logger.debug(`subscribers of global lock registry: ${reply[1]}`);
+            if (reply[1] > 0)
+                return;
+            this.subscriber.subscribe('twitter:locks', err => {
+                if (err) {
+                    logger.fatal(`failed to subscribe to global lock registry, error: ${err}`);
+                    process.exit(1);
+                }
+                logger.info(`nobody monitoring global lock registry, taken over now`);
+            });
+            this.subscriber.psubscribe(`twitter:lock/*`, err => {
+                if (err)
+                    return logger.error(`failed to subscribe to active process locks, error: ${err}`);
+                logger.debug(`monitoring all active locks`);
+            });
+            this.subscriber.on('message', (channel, message) => {
+                if (channel === 'twitter:locks') {
+                    const match = /^(WIP|PING):(.+)$/.exec(message);
+                    if (!match)
+                        return;
+                    const processId = match[2];
+                    if (match[1] === 'WIP') {
+                        this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
+                            if (err)
+                                return logger.error(`failed to subscribe to process lock ${processId}, error: ${err}`);
+                            logger.info(`received notification from process ${processId}, accepting messages on channel...`);
+                        });
+                    }
+                    if (match[1] === 'PING') {
+                        logger.debug(`received ping request to process ${processId}, checking channel activity...`);
+                        this.client.pubsub('NUMSUB', `twitter:lock/${processId}`, (err, reply) => {
+                            if (err) {
+                                logger.error(`failed to query subscribers of process lock ${processId}, error: ${err}`);
+                            }
+                            const count = reply[1] || 0;
+                            const statusMsg = count > 0 ? 'WIP' : 'NONE';
+                            logger.debug(`status of channel ${processId}: ${statusMsg}`);
+                            this.client.publish(`twitter:ping/${processId}`, statusMsg, err => {
+                                if (err) {
+                                    return logger.error(`failed to send response to subscribers of process lock ${processId}, error: ${err}`);
+                                }
+                                logger.info(`notified subscribers that process ${processId} ${count > 0 ? 'is running' : 'does not exist'}`);
+                            });
+                        });
+                    }
+                }
+            });
+        });
+        this.subscriptions = {};
         this.expireAfter = opt.redisExpireTime;
         logger.info(`loaded redis service at ${opt.redisHost}:${opt.redisPort}`);
     }

+ 43 - 15
dist/twitter.js

@@ -127,37 +127,65 @@ class default_1 {
             });
             return fetchTimeline();
         };
-        this.workOnTweets = (tweets, sendTweets, refresh = false) => Promise.all(tweets.map(tweet => ((this.redis && !refresh) ? this.redis.getContent(`webshot/${tweet.id_str}`) : Promise.reject())
+        this.workOnTweets = (tweets, sendTweets, refresh = false) => Promise.all(tweets.map(tweet => ((this.redis && !refresh) ?
+            this.redis.waitForProcess(`webshot/${tweet.id_str}`, this.webshotDelay * 4)
+                .then(() => this.redis.getContent(`webshot/${tweet.id_str}`)) :
+            Promise.reject())
             .then(content => {
             if (content === null)
                 throw Error();
             logger.info(`retrieved cached webshot of tweet ${tweet.id_str} from redis database`);
             const { msg, text, author } = JSON.parse(content);
-            sendTweets(tweet.retweeted_status ? tweet.retweeted_status.id_str : tweet.id_str, msg, text, author);
-        }).catch(() => this.webshot([tweet], (id, msg, text, author) => {
+            let cacheId = tweet.id_str;
+            if (tweet.retweeted_status)
+                cacheId += `,rt:${tweet.retweeted_status.id_str}`;
+            sendTweets(cacheId, msg, text, author);
+            return null;
+        })
+            .catch(() => {
+            this.redis.startProcess(`webshot/${tweet.id_str}`);
+            return tweet;
+        }))).then(tweets => this.webshot(tweets.filter(t => t), (cacheId, msg, text, author) => {
             Promise.resolve()
                 .then(() => {
                 if (!this.redis)
                     return;
-                logger.info(`caching webshot of tweet ${tweet.id_str} to redis database`);
-                this.redis.cacheContent(`webshot/${tweet.id_str}`, JSON.stringify({ msg, text, author }));
+                const twId = cacheId.split(',')[0];
+                logger.info(`caching webshot of tweet ${twId} to redis database`);
+                this.redis.cacheContent(`webshot/${twId}`, JSON.stringify({ msg, text, author }))
+                    .then(() => this.redis.finishProcess(`webshot/${twId}`));
             })
-                .then(() => sendTweets(id, msg, text, author));
-        }, this.webshotDelay))));
+                .then(() => sendTweets(cacheId, msg, text, author));
+        }, this.webshotDelay));
         this.getTweet = (id, sender, refresh = false) => {
             const endpoint = 'statuses/show';
             const config = {
                 id,
                 tweet_mode: 'extended',
             };
-            return this.client.get(endpoint, config)
+            return ((this.redis && !refresh) ?
+                this.redis.waitForProcess(`webshot/${id}`, this.webshotDelay * 4)
+                    .then(() => this.redis.getContent(`webshot/${id}`))
+                    .then(content => {
+                    if (content === null)
+                        throw Error();
+                    return { id_str: id };
+                }) :
+                Promise.reject())
+                .catch(() => this.client.get(endpoint, config))
                 .then((tweet) => {
-                logger.debug(`api returned tweet ${JSON.stringify(tweet)} for query id=${id}`);
+                if (tweet.id) {
+                    logger.debug(`api returned tweet ${JSON.stringify(tweet)} for query id=${id}`);
+                }
+                else {
+                    logger.debug(`skipped querying api as this tweet has been cached`);
+                }
                 return this.workOnTweets([tweet], sender, refresh);
             });
         };
         this.sendTweets = (config = { reportOnSkip: false, force: false }, ...to) => (id, msg, text, author) => {
             to.forEach(subscriber => {
+                const [twId, rtId] = (/(\d+)(?:,rt:(\d+))?/.exec(id) || []).slice(1);
                 const { sourceInfo: source, reportOnSkip, force } = config;
                 const targetStr = JSON.stringify(subscriber);
                 const send = () => retryOnError(() => this.bot.sendTo(subscriber, msg), (_, count, terminate) => {
@@ -170,17 +198,17 @@ class default_1 {
                     }
                 }).then(() => {
                     if (this.redis) {
-                        logger.info(`caching push status of this tweet (or its origin in case of a retweet) for ${targetStr}...`);
-                        return this.redis.cacheForChat(id, subscriber);
+                        logger.info(`caching push status of tweet ${rtId ? `${rtId} (RT)` : twId} for ${targetStr}...`);
+                        return this.redis.cacheForChat(rtId || twId, subscriber);
                     }
                 });
-                ((this.redis && !force) ? this.redis.isCachedForChat(id, subscriber) : Promise.resolve(false))
+                ((this.redis && !force) ? this.redis.isCachedForChat(rtId || twId, subscriber) : Promise.resolve(false))
                     .then(isCached => {
                     if (isCached) {
-                        logger.info(`skipped subscriber ${targetStr} as this tweet or the origin of this retweet has been sent already`);
+                        logger.info(`skipped subscriber ${targetStr} as tweet ${rtId ? `${rtId} (or its RT)` : twId} has been sent already`);
                         if (!reportOnSkip)
                             return;
-                        text = `[最近发送过的推文:${id}]`;
+                        text = `[最近发送过的推文:${rtId || twId}]`;
                         msg = author + text;
                     }
                     logger.info(`pushing data${source ? ` of ${source}` : ''} to ${targetStr}`);
@@ -371,7 +399,7 @@ class default_1 {
 编号:${tweet.id_str}
 时间:${tweet.created_at}
 媒体:${tweet.extended_entities ? '有' : '无'}
-正文:\n${tweet.full_text.replace(/^([\s\S\n]{50})[\s\S\n]+( https:\/\/t.co\/.*)$/, '$1…$2')}`))
+正文:\n${tweet.full_text.replace(/^([\s\S\n]{50})[\s\S\n]+?( https:\/\/t.co\/.*)?$/, '$1…$2')}`))
                 .concat(() => this.bot.sendTo(receiver, tweets.length ?
                 '时间线查询完毕,使用 /twitterpic_view <编号> 查看媒体推文详细内容。' :
                 '时间线查询完毕,没有找到符合条件的媒体推文。'))))

+ 7 - 4
dist/webshot.js

@@ -283,14 +283,15 @@ class Webshot extends CallableInstance {
         let promise = new Promise(resolve => {
             resolve();
         });
-        tweets.forEach(twi => {
-            promise = promise.then(() => {
+        tweets.forEach((twi, index) => {
+            promise = promise.then(() => util_1.promisify(setTimeout)(webshotDelay / 4 * index)).then(() => {
                 logger.info(`working on ${twi.user.screen_name}/${twi.id_str}`);
             });
             const originTwi = twi.retweeted_status || twi;
             let messageChain = '';
             let truncatedAt;
             let author = `${twi.user.name} (@${twi.user.screen_name}):\n`;
+            author += `${new Date(twi.created_at)}\n`;
             if (twi.retweeted_status)
                 author += `RT @${twi.retweeted_status.user.screen_name}: `;
             let text = originTwi.full_text;
@@ -387,8 +388,10 @@ class Webshot extends CallableInstance {
             promise.then(() => {
                 logger.info(`done working on ${twi.user.screen_name}/${twi.id_str}, message chain:`);
                 logger.info(JSON.stringify(koishi_1.Message.ellipseBase64(messageChain)));
-                const twiId = twi.retweeted_status ? twi.retweeted_status.id_str : twi.id_str;
-                callback(twiId, messageChain, xmlEntities.decode(text), author);
+                let cacheId = twi.id_str;
+                if (twi.retweeted_status)
+                    cacheId += `,rt:${twi.retweeted_status.id_str}`;
+                callback(cacheId, messageChain, xmlEntities.decode(text), author);
             });
         });
         return promise;

+ 2 - 2
src/command.ts

@@ -57,11 +57,11 @@ function linkBuilder(userName: string, more = ''): string {
 function linkFinder(checkedMatch: string[], chat: IChat, lock: ILock): [string, number] {
   const normalizedLink =
     linkBuilder(normalizer.normalize(checkedMatch[0]), checkedMatch[1]?.toLowerCase());
-  const link = Object.keys(lock.threads).find(realLink => 
+  const link = Object.keys(lock.threads).find(realLink =>
     normalizedLink === realLink.replace(/\/@/, '/').toLowerCase()
   );
   if (!link) return [null, -1];
-  const index = lock.threads[link].subscribers.findIndex(({chatID, chatType}) => 
+  const index = lock.threads[link].subscribers.findIndex(({chatID, chatType}) =>
     chat.chatID === chatID && chat.chatType === chatType
   );
   return [link, index];

+ 138 - 3
src/redis.ts

@@ -1,18 +1,76 @@
-import * as redis from 'redis';
+import { createClient, Callback, ClientOpts, OverloadedCommand, RedisClient } from 'redis';
 import { getLogger } from './loggers';
 
 const logger = getLogger('redis');
 
+interface Client extends Omit<Omit<RedisClient, 'pubsub'>, 'duplicate'> {
+  pubsub: OverloadedCommand<string | string[], (string | number)[], boolean>;
+  duplicate: (options?: ClientOpts, cb?: Callback<Client>) => Client;
+}
+
 export default class {
 
-  private client: redis.RedisClient;
+  private client: Client;
+  private subscriber: Client;
+  private subscriptions: {[key: string]: Promise<void>};
   private expireAfter: number;
 
   constructor(opt: IRedisConfig) {
-    this.client = redis.createClient({
+    this.client = createClient({
       host: opt.redisHost,
       port: opt.redisPort,
+    }) as unknown as Client;
+    this.subscriber = this.client.duplicate();
+    this.client.pubsub('NUMSUB', 'twitter:locks', (err, reply) => {
+      if (err) {
+        logger.fatal(`failed to query subscribers of global lock registry, error: ${err}`);
+        process.exit(1);
+      }
+      logger.debug(`subscribers of global lock registry: ${reply[1]}`);
+      if (reply[1] > 0) return;
+      this.subscriber.subscribe('twitter:locks', err => {
+        if (err) {
+          logger.fatal(`failed to subscribe to global lock registry, error: ${err}`);
+          process.exit(1);
+        }
+        logger.info(`nobody monitoring global lock registry, taken over now`);
+      });
+      this.subscriber.psubscribe(`twitter:lock/*`, err => {
+        if (err) return logger.error(`failed to subscribe to active process locks, error: ${err}`);
+        logger.debug(`monitoring all active locks`);
+      });
+      this.subscriber.on('message', (channel, message) => {
+        if (channel === 'twitter:locks') {
+          const match = /^(WIP|PING):(.+)$/.exec(message);
+          if (!match) return;
+          const processId = match[2];
+          if (match[1] === 'WIP') {
+            this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
+              if (err) return logger.error(`failed to subscribe to process lock ${processId}, error: ${err}`);
+              logger.info(`received notification from process ${processId}, accepting messages on channel...`);
+            });
+          }
+          if (match[1] === 'PING') {
+            logger.debug(`received ping request to process ${processId}, checking channel activity...`);
+            this.client.pubsub('NUMSUB', `twitter:lock/${processId}`, (err, reply) => {
+              if (err) {
+                logger.error(`failed to query subscribers of process lock ${processId}, error: ${err}`);
+              }
+              const count = reply[1] || 0;
+              const statusMsg = count > 0 ? 'WIP' : 'NONE';
+              logger.debug(`status of channel ${processId}: ${statusMsg}`);
+              this.client.publish(`twitter:ping/${processId}`, statusMsg, err => {
+                if (err) {
+                  return logger.error(`failed to send response to subscribers of process lock ${processId}, error: ${err}`);
+                }
+                logger.info(`notified subscribers that process ${processId} ${count > 0 ? 'is running' : 'does not exist'}`);
+              });
+            });
+          }
+        }
+      });
     });
+    this.subscriptions = {};
     this.expireAfter = opt.redisExpireTime;
     logger.info(`loaded redis service at ${opt.redisHost}:${opt.redisPort}`);
   }
@@ -54,6 +112,83 @@ export default class {
       throw err;
     });
 
+  public startProcess = (processId: string) => {
+    this.client.publish(`twitter:locks`, `WIP:${processId}`, (err, res) => {
+      if (err) {
+        return logger.error(`error notifying subscription client about WIP:${processId}, result: ${res}`);
+      }
+      logger.debug(`notified subscription client to start tracking ${processId}, result: ${res}`);
+    });
+    this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
+      if (err) logger.error(`failed to subscribe to own process lock, error: ${err}`);
+    });
+  }
+
+  public finishProcess = (processId: string) => {
+    this.client.publish(`twitter:lock/${processId}`, 'DONE');
+    this.subscriber.unsubscribe(`twitter:lock/${processId}`, err => {
+      if (err) return logger.error(`failed to unsubscribe from own process lock, error: ${err}`);
+      logger.info(`successfully unsubscribed from process lock ${processId}`);
+    });
+  }
+
+  public waitForProcess = (processId: string, timeout: number) => {
+    if (!(processId in this.subscriptions)) {
+      logger.debug(`creating new waiting function for ${processId}...`);
+      let timeoutHandle: ReturnType<typeof setTimeout>;
+      this.subscriptions[processId] = new Promise<void>((resolve, reject) => {
+        this.subscriber.on('message', (channel, message) => {
+          logger.debug(`received status notification from channel ${processId}, status: ${message}`);
+          if (channel === `twitter:lock/${processId}`) {
+            if (message === 'DONE') return resolve();
+            if (message === 'BREAK') return reject();
+          }
+          if (channel === `twitter:ping/${processId}`) {
+            this.subscriber.unsubscribe(`twitter:ping/${processId}`, err => {
+              if (err) {
+                return logger.error(`failed to unsubscribed from reply channel for pinging ${processId}, error: ${err}`);
+              }
+              logger.debug(`successfully unsubscribed from reply channel for pinging ${processId}`);
+            });
+            if (message === 'WIP') {
+              this.subscriber.subscribe(`twitter:lock/${processId}`, err => {
+                if (err) logger.error(`failed to subscribe to status channel of process ${processId}, error: ${err}`);
+                logger.debug(`successfully subscribed to status channel of process ${processId}`);
+              });
+            }
+            if (message === 'NONE') return resolve();
+          }
+        });
+        timeoutHandle = setTimeout(() => {
+          this.client.publish(`twitter:lock/${processId}`, 'BREAK', err => {
+            if (err) {
+              logger.error(`failed while calling to remove process lock ${processId}, error: ${err}`);
+            } else {
+              logger.warn(`timed out waiting for process ${processId}, give up waiting`);
+            }
+          });
+        }, timeout);
+        this.subscriber.subscribe(`twitter:ping/${processId}`, err => {
+          if (err) {
+            logger.error(`failed to subscribe to reply channel for pinging ${processId}, error: ${err}`);
+            reject();
+          }
+        })
+        this.client.publish(`twitter:locks`, `PING:${processId}`, (err, res) => {
+          if (err) {
+            return logger.error(`error pinging process ${processId} via subscription client, result: ${res}`);
+          }
+          logger.debug(`pinged process ${processId} via subscription client, result: ${res}`);
+        });
+      }).finally(() => {
+        clearTimeout(timeoutHandle);
+        logger.debug(`deleting waiting function for ${processId}...`);
+        delete this.subscriptions[processId];
+      })
+    }
+    return this.subscriptions[processId];
+  };
+
   public isCachedForChat = (postId: string, target: IChat) => {
     const targetStr = this.chatAsString(target);
     return new Promise<number>((resolve, reject) =>

+ 87 - 58
src/twitter.ts

@@ -104,14 +104,11 @@ export type Entities = TwitterTypes.Entities;
 export type ExtendedEntities = TwitterTypes.ExtendedEntities;
 export type MediaEntity = TwitterTypes.MediaEntity;
 
-interface ITweet extends TwitterTypes.Status {
+export interface Tweet extends TwitterTypes.Status {
   user: FullUser;
   retweeted_status?: Tweet;
 }
 
-export type Tweet = ITweet;
-export type Tweets = ITweet[];
-
 export default class {
 
   private client: Twitter;
@@ -180,7 +177,7 @@ export default class {
 编号:${tweet.id_str}
 时间:${tweet.created_at}
 媒体:${tweet.extended_entities ? '有' : '无'}
-正文:\n${tweet.full_text.replace(/^([\s\S\n]{50})[\s\S\n]+( https:\/\/t.co\/.*)$/, '$1…$2')}`
+正文:\n${tweet.full_text.replace(/^([\s\S\n]{50})[\s\S\n]+?( https:\/\/t.co\/.*)?$/, '$1…$2')}`
           ))
             .concat(() => this.bot.sendTo(receiver, tweets.length ?
               '时间线查询完毕,使用 /twitterpic_view <编号> 查看媒体推文详细内容。' :
@@ -205,9 +202,8 @@ export default class {
     );
   };
 
-  public queryUser = (username: string) =>
-    this.client.get('users/show', {screen_name: username})
-      .then((user: FullUser) => user.screen_name);
+  public queryUser = (username: string) => this.client.get('users/show', {screen_name: username})
+    .then((user: FullUser) => user.screen_name);
 
   public queryTimelineReverse = (conf: ITimelineQueryConfig) => {
     if (!conf.since) return this.queryTimeline(conf);
@@ -217,7 +213,7 @@ export default class {
     const until = () =>
       BigNumOps.min(maxID, BigNumOps.plus(conf.since, String(7 * 24 * 3600 * 1000 * 2 ** 22)));
     conf.until = until();
-    const promise = (tweets: ITweet[]): Promise<ITweet[]> =>
+    const promise = (tweets: Tweet[]): Promise<Tweet[]> =>
       this.queryTimeline(conf).then(newTweets => {
         tweets = newTweets.concat(tweets);
         conf.since = conf.until;
@@ -255,71 +251,104 @@ export default class {
         max_id: until,
         tweet_mode: 'extended',
       },
-      tweets: ITweet[] = []
-    ): Promise<ITweet[]> =>
-      this.client.get('statuses/user_timeline', config)
-        .then((newTweets: ITweet[]) => {
-          if (newTweets.length) {
-            logger.debug(`fetched tweets: ${JSON.stringify(newTweets)}`);
-            config.max_id = BigNumOps.plus('-1', newTweets[newTweets.length - 1].id_str);
-            logger.info(`timeline query of ${username} yielded ${
-              newTweets.length
-            } new tweets, next query will start at offset ${config.max_id}`);
-            tweets.push(...newTweets.filter(tweet => tweet.extended_entities));
-          }
-          if (!newTweets.length || tweets.length >= count) {
-            logger.info(`timeline query of ${username} finished successfully, ${
-              tweets.length
-            } tweets with extended entities have been fetched`);
-            return tweets.slice(0, count);
-          }
-          return fetchTimeline(config, tweets);
-        });
+      tweets: Tweet[] = []
+    ): Promise<Tweet[]> => this.client.get('statuses/user_timeline', config)
+      .then((newTweets: Tweet[]) => {
+        if (newTweets.length) {
+          logger.debug(`fetched tweets: ${JSON.stringify(newTweets)}`);
+          config.max_id = BigNumOps.plus('-1', newTweets[newTweets.length - 1].id_str);
+          logger.info(`timeline query of ${username} yielded ${
+            newTweets.length
+          } new tweets, next query will start at offset ${config.max_id}`);
+          tweets.push(...newTweets.filter(tweet => tweet.extended_entities));
+        }
+        if (!newTweets.length || tweets.length >= count) {
+          logger.info(`timeline query of ${username} finished successfully, ${
+            tweets.length
+          } tweets with extended entities have been fetched`);
+          return tweets.slice(0, count);
+        }
+        return fetchTimeline(config, tweets);
+      });
     return fetchTimeline();
   };
 
   private workOnTweets = (
-    tweets: Tweets,
-    sendTweets: (id: string, msg: string, text: string, author: string) => void,
+    tweets: Tweet[],
+    sendTweets: (cacheId: string, msg: string, text: string, author: string) => void,
     refresh = false
-  ) => Promise.all(tweets.map(tweet => 
-    ((this.redis && !refresh) ? this.redis.getContent(`webshot/${tweet.id_str}`) : Promise.reject())
+  ) => Promise.all(tweets.map(tweet =>
+    ((this.redis && !refresh) ?
+      this.redis.waitForProcess(`webshot/${tweet.id_str}`, this.webshotDelay * 4)
+        .then(() => this.redis.getContent(`webshot/${tweet.id_str}`)) :
+      Promise.reject())
       .then(content => {
         if (content === null) throw Error();
         logger.info(`retrieved cached webshot of tweet ${tweet.id_str} from redis database`);
         const {msg, text, author} = JSON.parse(content) as {[key: string]: string};
-        sendTweets(tweet.retweeted_status ? tweet.retweeted_status.id_str : tweet.id_str, msg, text, author);
-      }).catch(() =>
-        this.webshot([tweet], (id: string, msg: string, text: string, author: string) => {
-          Promise.resolve()
-            .then(() => {
-              if (!this.redis) return;
-              logger.info(`caching webshot of tweet ${tweet.id_str} to redis database`);
-              this.redis.cacheContent(`webshot/${tweet.id_str}`, JSON.stringify({msg, text, author}));
-            })
-            .then(() => sendTweets(id, msg, text, author));
-        }, this.webshotDelay)
-      )
-  ));
+        let cacheId = tweet.id_str;
+        if (tweet.retweeted_status) cacheId += `,rt:${tweet.retweeted_status.id_str}`;
+        sendTweets(cacheId, msg, text, author);
+        return null as Tweet;
+      })
+      .catch(() => {
+        this.redis.startProcess(`webshot/${tweet.id_str}`);
+        return tweet;
+      })
+  )).then(tweets =>
+    this.webshot(
+      tweets.filter(t => t),
+      (cacheId: string, msg: string, text: string, author: string) => {
+        Promise.resolve()
+          .then(() => {
+            if (!this.redis) return;
+            const twId = cacheId.split(',')[0];
+            logger.info(`caching webshot of tweet ${twId} to redis database`);
+            this.redis.cacheContent(`webshot/${twId}`, JSON.stringify({msg, text, author}))
+              .then(() => this.redis.finishProcess(`webshot/${twId}`));
+          })
+          .then(() => sendTweets(cacheId, msg, text, author));
+      },
+      this.webshotDelay
+    )
+  );
 
-  public getTweet = (id: string, sender: (id: string, msg: string, text: string, author: string) => void, refresh = false) => {
+  public getTweet = (
+    id: string,
+    sender: (cacheId: string, msg: string, text: string, author: string) => void,
+    refresh = false
+  ) => {
     const endpoint = 'statuses/show';
     const config = {
       id,
       tweet_mode: 'extended',
     };
-    return this.client.get(endpoint, config)
+    return ((this.redis && !refresh) ?
+      this.redis.waitForProcess(`webshot/${id}`, this.webshotDelay * 4)
+        .then(() => this.redis.getContent(`webshot/${id}`))
+        .then(content => {
+          if (content === null) throw Error();
+          return {id_str: id} as Tweet;
+        }) :
+      Promise.reject())
+      .catch(() => this.client.get(endpoint, config))
       .then((tweet: Tweet) => {
-        logger.debug(`api returned tweet ${JSON.stringify(tweet)} for query id=${id}`);
+        if (tweet.id) {
+          logger.debug(`api returned tweet ${JSON.stringify(tweet)} for query id=${id}`);
+        } else {
+          logger.debug(`skipped querying api as this tweet has been cached`)
+        }
         return this.workOnTweets([tweet], sender, refresh);
       });
   };
 
   private sendTweets = (
-    config: {sourceInfo?: string, reportOnSkip?: boolean, force?: boolean} = {reportOnSkip: false, force: false},
+    config: {sourceInfo?: string, reportOnSkip?: boolean, force?: boolean}
+      = {reportOnSkip: false, force: false},
     ...to: IChat[]
   ) => (id: string, msg: string, text: string, author: string) => {
     to.forEach(subscriber => {
+      const [twId, rtId] =  (/(\d+)(?:,rt:(\d+))?/.exec(id) || []).slice(1);
       const {sourceInfo: source, reportOnSkip, force} = config;
       const targetStr = JSON.stringify(subscriber);
       const send = () => retryOnError(
@@ -334,16 +363,16 @@ export default class {
         }
       ).then(() => {
         if (this.redis) {
-          logger.info(`caching push status of this tweet (or its origin in case of a retweet) for ${targetStr}...`);
-          return this.redis.cacheForChat(id, subscriber);
+          logger.info(`caching push status of tweet ${rtId ? `${rtId} (RT)` : twId} for ${targetStr}...`);
+          return this.redis.cacheForChat(rtId || twId, subscriber);
         }
       });
-      ((this.redis && !force) ? this.redis.isCachedForChat(id, subscriber) : Promise.resolve(false))
+      ((this.redis && !force) ? this.redis.isCachedForChat(rtId || twId, subscriber) : Promise.resolve(false))
         .then(isCached => {
           if (isCached) {
-            logger.info(`skipped subscriber ${targetStr} as this tweet or the origin of this retweet has been sent already`);
+            logger.info(`skipped subscriber ${targetStr} as tweet ${rtId ? `${rtId} (or its RT)` : twId} has been sent already`);
             if (!reportOnSkip) return;
-            text = `[最近发送过的推文:${id}]`;
+            text = `[最近发送过的推文:${rtId || twId}]`;
             msg = author + text;
           }
           logger.info(`pushing data${source ? ` of ${source}` : ''} to ${targetStr}`);
@@ -411,8 +440,8 @@ export default class {
         config.include_rts = false;
         if (offset as unknown as number > 0) config.since_id = offset;
         if (offset as unknown as number < -1) config.max_id = offset.slice(1);
-        const getMore = (gotTweets: Tweets = []) => this.client.get(
-          endpoint, config, (error: {[key: string]: any}[], tweets: Tweets
+        const getMore = (gotTweets: Tweet[] = []) => this.client.get(
+          endpoint, config, (error: {[key: string]: any}[], tweets: Tweet[]
         ) => {
           if (error) {
             if (error instanceof Array && error.length > 0 && error[0].code === 34) {
@@ -433,7 +462,7 @@ export default class {
       }
     });
 
-    promise.then((tweets: Tweets) => {
+    promise.then((tweets: Tweet[]) => {
       logger.debug(`api returned ${JSON.stringify(tweets)} for feed ${currentFeed}`);
       const currentThread = lock.threads[currentFeed];
 

+ 10 - 8
src/webshot.ts

@@ -12,7 +12,7 @@ import * as temp from 'temp';
 
 import { getLogger } from './loggers';
 import { Message } from './koishi';
-import { MediaEntity, Tweets } from './twitter';
+import { MediaEntity, Tweet } from './twitter';
 import { chainPromises } from './utils';
 
 const xmlEntities = new XmlEntities();
@@ -30,7 +30,7 @@ const typeInZH = {
 
 const logger = getLogger('webshot');
 
-class Webshot extends CallableInstance<[Tweets, (...args) => void, number], Promise<void>> {
+class Webshot extends CallableInstance<[Tweet[], (...args) => void, number], Promise<void>> {
 
   private browser: puppeteer.Browser;
   private mode: number;
@@ -309,15 +309,15 @@ class Webshot extends CallableInstance<[Tweets, (...args) => void, number], Prom
   );
 
   public webshot(
-    tweets: Tweets,
-    callback: (twiId: string, msgs: string, text: string, author: string) => void,
+    tweets: Tweet[],
+    callback: (cacheId: string, msgs: string, text: string, author: string) => void,
     webshotDelay: number
   ): Promise<void> {
     let promise = new Promise<void>(resolve => {
       resolve();
     });
-    tweets.forEach(twi => {
-      promise = promise.then(() => {
+    tweets.forEach((twi, index) => {
+      promise = promise.then(() => promisify(setTimeout)(webshotDelay / 4 * index)).then(() => {
         logger.info(`working on ${twi.user.screen_name}/${twi.id_str}`);
       });
       const originTwi = twi.retweeted_status || twi;
@@ -326,6 +326,7 @@ class Webshot extends CallableInstance<[Tweets, (...args) => void, number], Prom
 
       // text processing
       let author = `${twi.user.name} (@${twi.user.screen_name}):\n`;
+      author += `${new Date(twi.created_at)}\n`;
       if (twi.retweeted_status) author += `RT @${twi.retweeted_status.user.screen_name}: `;
 
       let text = originTwi.full_text;
@@ -425,8 +426,9 @@ class Webshot extends CallableInstance<[Tweets, (...args) => void, number], Prom
       promise.then(() => {
         logger.info(`done working on ${twi.user.screen_name}/${twi.id_str}, message chain:`);
         logger.info(JSON.stringify(Message.ellipseBase64(messageChain)));
-        const twiId = twi.retweeted_status ? twi.retweeted_status.id_str : twi.id_str;
-        callback(twiId, messageChain, xmlEntities.decode(text), author);
+        let cacheId = twi.id_str;
+        if (twi.retweeted_status) cacheId += `,rt:${twi.retweeted_status.id_str}`;
+        callback(cacheId, messageChain, xmlEntities.decode(text), author);
       });
     });
     return promise;