|
@@ -104,14 +104,11 @@ export type Entities = TwitterTypes.Entities;
|
|
export type ExtendedEntities = TwitterTypes.ExtendedEntities;
|
|
export type ExtendedEntities = TwitterTypes.ExtendedEntities;
|
|
export type MediaEntity = TwitterTypes.MediaEntity;
|
|
export type MediaEntity = TwitterTypes.MediaEntity;
|
|
|
|
|
|
-interface ITweet extends TwitterTypes.Status {
|
|
|
|
|
|
+export interface Tweet extends TwitterTypes.Status {
|
|
user: FullUser;
|
|
user: FullUser;
|
|
retweeted_status?: Tweet;
|
|
retweeted_status?: Tweet;
|
|
}
|
|
}
|
|
|
|
|
|
-export type Tweet = ITweet;
|
|
|
|
-export type Tweets = ITweet[];
|
|
|
|
-
|
|
|
|
export default class {
|
|
export default class {
|
|
|
|
|
|
private client: Twitter;
|
|
private client: Twitter;
|
|
@@ -213,20 +210,22 @@ export default class {
|
|
const count = conf.count;
|
|
const count = conf.count;
|
|
const maxID = conf.until;
|
|
const maxID = conf.until;
|
|
conf.count = undefined;
|
|
conf.count = undefined;
|
|
- const until = () => BigNumOps.min(maxID, BigNumOps.plus(conf.since, String(7 * 24 * 3600 * 1000 * 2 ** 22)));
|
|
|
|
|
|
+ const until = () =>
|
|
|
|
+ BigNumOps.min(maxID, BigNumOps.plus(conf.since, String(7 * 24 * 3600 * 1000 * 2 ** 22)));
|
|
conf.until = until();
|
|
conf.until = until();
|
|
- const promise = (tweets: ITweet[]): Promise<ITweet[]> =>this.queryTimeline(conf).then(newTweets => {
|
|
|
|
- tweets = newTweets.concat(tweets);
|
|
|
|
- conf.since = conf.until;
|
|
|
|
- conf.until = until();
|
|
|
|
- if (
|
|
|
|
- tweets.length >= count ||
|
|
|
|
|
|
+ const promise = (tweets: Tweet[]): Promise<Tweet[]> =>
|
|
|
|
+ this.queryTimeline(conf).then(newTweets => {
|
|
|
|
+ tweets = newTweets.concat(tweets);
|
|
|
|
+ conf.since = conf.until;
|
|
|
|
+ conf.until = until();
|
|
|
|
+ if (
|
|
|
|
+ tweets.length >= count ||
|
|
BigNumOps.compare(conf.since, conf.until) >= 0
|
|
BigNumOps.compare(conf.since, conf.until) >= 0
|
|
- ) {
|
|
|
|
- return tweets.slice(-count);
|
|
|
|
- }
|
|
|
|
- return promise(tweets);
|
|
|
|
- });
|
|
|
|
|
|
+ ) {
|
|
|
|
+ return tweets.slice(-count);
|
|
|
|
+ }
|
|
|
|
+ return promise(tweets);
|
|
|
|
+ });
|
|
return promise([]);
|
|
return promise([]);
|
|
};
|
|
};
|
|
|
|
|
|
@@ -252,9 +251,9 @@ export default class {
|
|
max_id: until,
|
|
max_id: until,
|
|
tweet_mode: 'extended',
|
|
tweet_mode: 'extended',
|
|
},
|
|
},
|
|
- tweets: ITweet[] = []
|
|
|
|
- ): Promise<ITweet[]> => this.client.get('statuses/user_timeline', config)
|
|
|
|
- .then((newTweets: ITweet[]) => {
|
|
|
|
|
|
+ tweets: Tweet[] = []
|
|
|
|
+ ): Promise<Tweet[]> => this.client.get('statuses/user_timeline', config)
|
|
|
|
+ .then((newTweets: Tweet[]) => {
|
|
if (newTweets.length) {
|
|
if (newTweets.length) {
|
|
logger.debug(`fetched tweets: ${JSON.stringify(newTweets)}`);
|
|
logger.debug(`fetched tweets: ${JSON.stringify(newTweets)}`);
|
|
config.max_id = BigNumOps.plus('-1', newTweets[newTweets.length - 1].id_str);
|
|
config.max_id = BigNumOps.plus('-1', newTweets[newTweets.length - 1].id_str);
|
|
@@ -275,47 +274,81 @@ export default class {
|
|
};
|
|
};
|
|
|
|
|
|
private workOnTweets = (
|
|
private workOnTweets = (
|
|
- tweets: Tweets,
|
|
|
|
- sendTweets: (id: string, msg: string, text: string, author: string) => void,
|
|
|
|
|
|
+ tweets: Tweet[],
|
|
|
|
+ sendTweets: (cacheId: string, msg: string, text: string, author: string) => void,
|
|
refresh = false
|
|
refresh = false
|
|
- ) => Promise.all(tweets.map(tweet =>
|
|
|
|
- ((this.redis && !refresh) ? this.redis.getContent(`webshot/${tweet.id_str}`) : Promise.reject())
|
|
|
|
|
|
+ ) => Promise.all(tweets.map(tweet =>
|
|
|
|
+ ((this.redis && !refresh) ?
|
|
|
|
+ this.redis.waitForProcess(`webshot/${tweet.id_str}`, this.webshotDelay * 4)
|
|
|
|
+ .then(() => this.redis.getContent(`webshot/${tweet.id_str}`)) :
|
|
|
|
+ Promise.reject())
|
|
.then(content => {
|
|
.then(content => {
|
|
if (content === null) throw Error();
|
|
if (content === null) throw Error();
|
|
logger.info(`retrieved cached webshot of tweet ${tweet.id_str} from redis database`);
|
|
logger.info(`retrieved cached webshot of tweet ${tweet.id_str} from redis database`);
|
|
const {msg, text, author} = JSON.parse(content) as {[key: string]: string};
|
|
const {msg, text, author} = JSON.parse(content) as {[key: string]: string};
|
|
- sendTweets(tweet.retweeted_status ? tweet.retweeted_status.id_str : tweet.id_str, msg, text, author);
|
|
|
|
- }).catch(() =>
|
|
|
|
- this.webshot([tweet], (id: string, msg: string, text: string, author: string) => {
|
|
|
|
- Promise.resolve()
|
|
|
|
- .then(() => {
|
|
|
|
- if (!this.redis) return;
|
|
|
|
- logger.info(`caching webshot of tweet ${tweet.id_str} to redis database`);
|
|
|
|
- this.redis.cacheContent(`webshot/${tweet.id_str}`, JSON.stringify({msg, text, author}));
|
|
|
|
- })
|
|
|
|
- .then(() => sendTweets(id, msg, text, author));
|
|
|
|
- }, this.webshotDelay)
|
|
|
|
- )
|
|
|
|
- ));
|
|
|
|
|
|
+ let cacheId = tweet.id_str;
|
|
|
|
+ if (tweet.retweeted_status) cacheId += `,rt:${tweet.retweeted_status.id_str}`;
|
|
|
|
+ sendTweets(cacheId, msg, text, author);
|
|
|
|
+ return null as Tweet;
|
|
|
|
+ })
|
|
|
|
+ .catch(() => {
|
|
|
|
+ this.redis.startProcess(`webshot/${tweet.id_str}`);
|
|
|
|
+ return tweet;
|
|
|
|
+ })
|
|
|
|
+ )).then(tweets =>
|
|
|
|
+ this.webshot(
|
|
|
|
+ tweets.filter(t => t),
|
|
|
|
+ (cacheId: string, msg: string, text: string, author: string) => {
|
|
|
|
+ Promise.resolve()
|
|
|
|
+ .then(() => {
|
|
|
|
+ if (!this.redis) return;
|
|
|
|
+ const twId = cacheId.split(',')[0];
|
|
|
|
+ logger.info(`caching webshot of tweet ${twId} to redis database`);
|
|
|
|
+ this.redis.cacheContent(`webshot/${twId}`, JSON.stringify({msg, text, author}))
|
|
|
|
+ .then(() => this.redis.finishProcess(`webshot/${twId}`));
|
|
|
|
+ })
|
|
|
|
+ .then(() => sendTweets(cacheId, msg, text, author));
|
|
|
|
+ },
|
|
|
|
+ this.webshotDelay
|
|
|
|
+ )
|
|
|
|
+ );
|
|
|
|
|
|
- public getTweet = (id: string, sender: (id: string, msg: string, text: string, author: string) => void, refresh = false) => {
|
|
|
|
|
|
+ public getTweet = (
|
|
|
|
+ id: string,
|
|
|
|
+ sender: (cacheId: string, msg: string, text: string, author: string) => void,
|
|
|
|
+ refresh = false
|
|
|
|
+ ) => {
|
|
const endpoint = 'statuses/show';
|
|
const endpoint = 'statuses/show';
|
|
const config = {
|
|
const config = {
|
|
id,
|
|
id,
|
|
tweet_mode: 'extended',
|
|
tweet_mode: 'extended',
|
|
};
|
|
};
|
|
- return this.client.get(endpoint, config)
|
|
|
|
|
|
+ return ((this.redis && !refresh) ?
|
|
|
|
+ this.redis.waitForProcess(`webshot/${id}`, this.webshotDelay * 4)
|
|
|
|
+ .then(() => this.redis.getContent(`webshot/${id}`))
|
|
|
|
+ .then(content => {
|
|
|
|
+ if (content === null) throw Error();
|
|
|
|
+ return {id_str: id} as Tweet;
|
|
|
|
+ }) :
|
|
|
|
+ Promise.reject())
|
|
|
|
+ .catch(() => this.client.get(endpoint, config))
|
|
.then((tweet: Tweet) => {
|
|
.then((tweet: Tweet) => {
|
|
- logger.debug(`api returned tweet ${JSON.stringify(tweet)} for query id=${id}`);
|
|
|
|
|
|
+ if (tweet.id) {
|
|
|
|
+ logger.debug(`api returned tweet ${JSON.stringify(tweet)} for query id=${id}`);
|
|
|
|
+ } else {
|
|
|
|
+ logger.debug(`skipped querying api as this tweet has been cached`)
|
|
|
|
+ }
|
|
return this.workOnTweets([tweet], sender, refresh);
|
|
return this.workOnTweets([tweet], sender, refresh);
|
|
});
|
|
});
|
|
};
|
|
};
|
|
|
|
|
|
private sendTweets = (
|
|
private sendTweets = (
|
|
- config: {sourceInfo?: string, reportOnSkip?: boolean, force?: boolean} = {reportOnSkip: false, force: false},
|
|
|
|
|
|
+ config: {sourceInfo?: string, reportOnSkip?: boolean, force?: boolean}
|
|
|
|
+ = {reportOnSkip: false, force: false},
|
|
...to: IChat[]
|
|
...to: IChat[]
|
|
) => (id: string, msg: string, text: string, author: string) => {
|
|
) => (id: string, msg: string, text: string, author: string) => {
|
|
to.forEach(subscriber => {
|
|
to.forEach(subscriber => {
|
|
|
|
+ const [twId, rtId] = (/(\d+)(?:,rt:(\d+))?/.exec(id) || []).slice(1);
|
|
const {sourceInfo: source, reportOnSkip, force} = config;
|
|
const {sourceInfo: source, reportOnSkip, force} = config;
|
|
const targetStr = JSON.stringify(subscriber);
|
|
const targetStr = JSON.stringify(subscriber);
|
|
const send = () => retryOnError(
|
|
const send = () => retryOnError(
|
|
@@ -330,16 +363,16 @@ export default class {
|
|
}
|
|
}
|
|
).then(() => {
|
|
).then(() => {
|
|
if (this.redis) {
|
|
if (this.redis) {
|
|
- logger.info(`caching push status of this tweet (or its origin in case of a retweet) for ${targetStr}...`);
|
|
|
|
- return this.redis.cacheForChat(id, subscriber);
|
|
|
|
|
|
+ logger.info(`caching push status of tweet ${rtId ? `${rtId} (RT)` : twId} for ${targetStr}...`);
|
|
|
|
+ return this.redis.cacheForChat(rtId || twId, subscriber);
|
|
}
|
|
}
|
|
});
|
|
});
|
|
- ((this.redis && !force) ? this.redis.isCachedForChat(id, subscriber) : Promise.resolve(false))
|
|
|
|
|
|
+ ((this.redis && !force) ? this.redis.isCachedForChat(rtId || twId, subscriber) : Promise.resolve(false))
|
|
.then(isCached => {
|
|
.then(isCached => {
|
|
if (isCached) {
|
|
if (isCached) {
|
|
- logger.info(`skipped subscriber ${targetStr} as this tweet or the origin of this retweet has been sent already`);
|
|
|
|
|
|
+ logger.info(`skipped subscriber ${targetStr} as tweet ${rtId ? `${rtId} (or its RT)` : twId} has been sent already`);
|
|
if (!reportOnSkip) return;
|
|
if (!reportOnSkip) return;
|
|
- text = `[最近发送过的推文:${id}]`;
|
|
|
|
|
|
+ text = `[最近发送过的推文:${rtId || twId}]`;
|
|
msg = author + text;
|
|
msg = author + text;
|
|
}
|
|
}
|
|
logger.info(`pushing data${source ? ` of ${source}` : ''} to ${targetStr}`);
|
|
logger.info(`pushing data${source ? ` of ${source}` : ''} to ${targetStr}`);
|
|
@@ -405,8 +438,8 @@ export default class {
|
|
if (endpoint) {
|
|
if (endpoint) {
|
|
const offset = lock.threads[currentFeed].offset;
|
|
const offset = lock.threads[currentFeed].offset;
|
|
if (offset as unknown as number > 0) config.since_id = offset;
|
|
if (offset as unknown as number > 0) config.since_id = offset;
|
|
- const getMore = (gotTweets: Tweets = []) => this.client.get(
|
|
|
|
- endpoint, config, (error: {[key: string]: any}[], tweets: Tweets
|
|
|
|
|
|
+ const getMore = (gotTweets: Tweet[] = []) => this.client.get(
|
|
|
|
+ endpoint, config, (error: {[key: string]: any}[], tweets: Tweet[]
|
|
) => {
|
|
) => {
|
|
if (error) {
|
|
if (error) {
|
|
if (error instanceof Array && error.length > 0 && error[0].code === 34) {
|
|
if (error instanceof Array && error.length > 0 && error[0].code === 34) {
|
|
@@ -427,7 +460,7 @@ export default class {
|
|
}
|
|
}
|
|
});
|
|
});
|
|
|
|
|
|
- promise.then((tweets: Tweets) => {
|
|
|
|
|
|
+ promise.then((tweets: Tweet[]) => {
|
|
logger.debug(`api returned ${JSON.stringify(tweets)} for feed ${currentFeed}`);
|
|
logger.debug(`api returned ${JSON.stringify(tweets)} for feed ${currentFeed}`);
|
|
const currentThread = lock.threads[currentFeed];
|
|
const currentThread = lock.threads[currentFeed];
|
|
|
|
|