twitter.ts 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. import * as Twitter from 'twitter';
  2. import TwitterTypes from 'twitter-d';
  3. import { getLogger } from './loggers';
  4. import { BigNumOps } from './utils';
  5. interface IWorkerOption {
  6. consumerKey: string;
  7. consumerSecret: string;
  8. accessTokenKey: string;
  9. accessTokenSecret: string;
  10. }
  11. export interface ITimelineQueryConfig {
  12. username: string;
  13. count?: number;
  14. since?: string;
  15. until?: string;
  16. noreps?: boolean;
  17. norts?: boolean;
  18. }
  19. const TWITTER_EPOCH = 1288834974657;
  20. export const snowflake = (epoch: number) => Number.isNaN(epoch) ? undefined :
  21. BigNumOps.lShift(String(epoch - 1 - TWITTER_EPOCH), 22);
  22. const logger = getLogger('twitter');
  23. export type FullUser = TwitterTypes.FullUser;
  24. export type Entities = TwitterTypes.Entities;
  25. export type ExtendedEntities = TwitterTypes.ExtendedEntities;
  26. export type MediaEntity = TwitterTypes.MediaEntity;
  27. interface ITweet extends TwitterTypes.Status {
  28. user: FullUser;
  29. retweeted_status?: Tweet;
  30. }
  31. export type Tweet = ITweet;
  32. export type Tweets = ITweet[];
  33. export let queryByRegExp = (username: string, regexp: RegExp, cacheSeconds?: number, until?: string) =>
  34. Promise.resolve<RegExpExecArray>(null);
  35. export default class {
  36. private client: Twitter;
  37. private lastQueries: {[key: string]: {match: RegExpExecArray, id: string}} = {};
  38. constructor(opt: IWorkerOption) {
  39. this.client = new Twitter({
  40. consumer_key: opt.consumerKey,
  41. consumer_secret: opt.consumerSecret,
  42. access_token_key: opt.accessTokenKey,
  43. access_token_secret: opt.accessTokenSecret,
  44. });
  45. queryByRegExp = (username, regexp, cacheSeconds?: number, until?: string) => {
  46. logger.info(`searching timeline of @${username} for matches of ${regexp}...`);
  47. const normalizedUsername = username.toLowerCase().replace(/^@/, '');
  48. const queryKey = `${normalizedUsername}:${regexp.toString()}`;
  49. const isOld = (then: string) => {
  50. if (!then) return true;
  51. return BigNumOps.compare(snowflake(Date.now() - cacheSeconds * 1000), then) >= 0;
  52. };
  53. if (queryKey in this.lastQueries && !isOld(this.lastQueries[queryKey].id)) {
  54. const {match, id} = this.lastQueries[queryKey];
  55. logger.info(`found match ${JSON.stringify(match)} from cached tweet of id ${id}`);
  56. return Promise.resolve(match);
  57. }
  58. return this.queryTimeline({username, norts: true, until})
  59. .then(tweets => {
  60. const found = tweets.find(tweet => regexp.test(tweet.full_text));
  61. if (found) {
  62. const match = regexp.exec(found.full_text);
  63. this.lastQueries[queryKey] = {match, id: found.id_str};
  64. logger.info(`found match ${JSON.stringify(match)} in tweet of id ${found.id_str} from timeline`);
  65. return match;
  66. }
  67. const last = tweets.slice(-1)[0].id_str;
  68. if (isOld(last)) return null;
  69. queryByRegExp(username, regexp, cacheSeconds, last);
  70. });
  71. };
  72. }
  73. public queryUser = (username: string) => this.client.get('users/show', {screen_name: username})
  74. .then((user: FullUser) => user.screen_name);
  75. public queryTimelineReverse = (conf: ITimelineQueryConfig) => {
  76. if (!conf.since) return this.queryTimeline(conf);
  77. const count = conf.count;
  78. const maxID = conf.until;
  79. conf.count = undefined;
  80. const until = () => BigNumOps.min(maxID, BigNumOps.plus(conf.since, String(7 * 24 * 3600 * 1000 * 2 ** 22)));
  81. conf.until = until();
  82. const promise = (tweets: ITweet[]): Promise<ITweet[]> =>this.queryTimeline(conf).then(newTweets => {
  83. tweets = newTweets.concat(tweets);
  84. conf.since = conf.until;
  85. conf.until = until();
  86. if (
  87. tweets.length >= count ||
  88. BigNumOps.compare(conf.since, conf.until) >= 0
  89. ) {
  90. return tweets.slice(-count);
  91. }
  92. return promise(tweets);
  93. });
  94. return promise([]);
  95. };
  96. public queryTimeline = (
  97. { username, count, since, until, noreps, norts }: ITimelineQueryConfig
  98. ) => {
  99. username = username.replace(/^@?(.*)$/, '@$1');
  100. logger.info(`querying timeline of ${username} with config: ${
  101. JSON.stringify({
  102. ...(count && {count}),
  103. ...(since && {since}),
  104. ...(until && {until}),
  105. ...(noreps && {noreps}),
  106. ...(norts && {norts}),
  107. })}`);
  108. const fetchTimeline = (
  109. config = {
  110. screen_name: username.slice(1),
  111. trim_user: true,
  112. exclude_replies: noreps ?? true,
  113. include_rts: !(norts ?? false),
  114. since_id: since,
  115. max_id: until,
  116. tweet_mode: 'extended',
  117. },
  118. tweets: ITweet[] = []
  119. ): Promise<ITweet[]> => this.client.get('statuses/user_timeline', config)
  120. .then((newTweets: ITweet[]) => {
  121. if (newTweets.length) {
  122. logger.debug(`fetched tweets: ${JSON.stringify(newTweets)}`);
  123. config.max_id = BigNumOps.plus('-1', newTweets[newTweets.length - 1].id_str);
  124. logger.info(`timeline query of ${username} yielded ${
  125. newTweets.length
  126. } new tweets, next query will start at offset ${config.max_id}`);
  127. tweets.push(...newTweets);
  128. }
  129. if (!newTweets.length || count === undefined || tweets.length >= count) {
  130. logger.info(`timeline query of ${username} finished successfully, ${
  131. tweets.length
  132. } tweets have been fetched`);
  133. return tweets.slice(0, count);
  134. }
  135. return fetchTimeline(config, tweets);
  136. });
  137. return fetchTimeline();
  138. };
  139. }