twitter.ts 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. import * as fs from 'fs';
  2. import * as path from 'path';
  3. import * as Twitter from 'twitter';
  4. import TwitterTypes from 'twitter-d';
  5. import { getLogger } from './loggers';
  6. import QQBot from './koishi';
  7. import RedisSvc from './redis';
  8. import { chainPromises, BigNumOps } from './utils';
  9. import Webshot from './webshot';
  10. interface IWorkerOption {
  11. lock: ILock;
  12. lockfile: string;
  13. bot: QQBot;
  14. workInterval: number;
  15. webshotDelay: number;
  16. consumerKey: string;
  17. consumerSecret: string;
  18. accessTokenKey: string;
  19. accessTokenSecret: string;
  20. mode: number;
  21. wsUrl: string;
  22. redis?: IRedisConfig;
  23. }
  24. export class ScreenNameNormalizer {
  25. // tslint:disable-next-line: variable-name
  26. public static _queryUser: (username: string) => Promise<string>;
  27. public static normalize = (username: string) => username.toLowerCase().replace(/^@/, '');
  28. public static async normalizeLive(username: string) {
  29. if (this._queryUser) {
  30. return await this._queryUser(username)
  31. .catch((err: {code: number, message: string}[]) => {
  32. if (err[0].code !== 50) {
  33. logger.warn(`error looking up user: ${err[0].message}`);
  34. return username;
  35. }
  36. return null;
  37. });
  38. }
  39. return this.normalize(username);
  40. }
  41. }
  42. export let sendTweet = (id: string, receiver: IChat, forceRefresh: boolean): void => {
  43. throw Error();
  44. };
  45. export interface ITimelineQueryConfig {
  46. username: string;
  47. count?: number;
  48. since?: string;
  49. until?: string;
  50. noreps?: boolean;
  51. norts?: boolean;
  52. }
  53. export let sendTimeline = (
  54. conf: {[key in keyof ITimelineQueryConfig]: string},
  55. receiver: IChat
  56. ): void => {
  57. throw Error();
  58. };
  59. const TWITTER_EPOCH = 1288834974657;
  60. const snowflake = (epoch: number) => Number.isNaN(epoch) ? undefined :
  61. BigNumOps.lShift(String(epoch - 1 - TWITTER_EPOCH), 22);
  62. const logger = getLogger('twitter');
  63. const maxTrials = 3;
  64. const retryInterval = 1500;
  65. const ordinal = (n: number) => {
  66. switch ((Math.trunc(n / 10) % 10 === 1) ? 0 : n % 10) {
  67. case 1:
  68. return `${n}st`;
  69. case 2:
  70. return `${n}nd`;
  71. case 3:
  72. return `${n}rd`;
  73. default:
  74. return `${n}th`;
  75. }
  76. };
  77. const retryOnError = <T, U>(
  78. doWork: () => Promise<T>,
  79. onRetry: (error, count: number, terminate: (defaultValue: U) => void) => void
  80. ) => new Promise<T | U>(resolve => {
  81. const retry = (reason, count: number) => {
  82. setTimeout(() => {
  83. let terminate = false;
  84. onRetry(reason, count, defaultValue => { terminate = true; resolve(defaultValue); });
  85. if (!terminate) doWork().then(resolve).catch(error => retry(error, count + 1));
  86. }, retryInterval);
  87. };
  88. doWork().then(resolve).catch(error => retry(error, 1));
  89. });
  90. export type FullUser = TwitterTypes.FullUser;
  91. export type Entities = TwitterTypes.Entities;
  92. export type ExtendedEntities = TwitterTypes.ExtendedEntities;
  93. export type MediaEntity = TwitterTypes.MediaEntity;
  94. interface ITweet extends TwitterTypes.Status {
  95. user: FullUser;
  96. retweeted_status?: Tweet;
  97. }
  98. export type Tweet = ITweet;
  99. export type Tweets = ITweet[];
  100. export default class {
  101. private client: Twitter;
  102. private lock: ILock;
  103. private lockfile: string;
  104. private workInterval: number;
  105. private bot: QQBot;
  106. private webshotDelay: number;
  107. private webshot: Webshot;
  108. private mode: number;
  109. private wsUrl: string;
  110. private redis: RedisSvc;
  111. constructor(opt: IWorkerOption) {
  112. this.client = new Twitter({
  113. consumer_key: opt.consumerKey,
  114. consumer_secret: opt.consumerSecret,
  115. access_token_key: opt.accessTokenKey,
  116. access_token_secret: opt.accessTokenSecret,
  117. });
  118. this.lockfile = opt.lockfile;
  119. this.lock = opt.lock;
  120. this.workInterval = opt.workInterval;
  121. this.bot = opt.bot;
  122. this.webshotDelay = opt.webshotDelay;
  123. this.mode = opt.mode;
  124. this.wsUrl = opt.wsUrl;
  125. if (opt.redis) this.redis = new RedisSvc(opt.redis);
  126. ScreenNameNormalizer._queryUser = this.queryUser;
  127. sendTweet = (idOrQuery, receiver, forceRefresh) => {
  128. const match = /^last(|-\d+)@([^\/?#,]+)((?:,no.*?=[^,]*)*)$/.exec(idOrQuery);
  129. const query = () => this.queryTimeline({
  130. username: match[2],
  131. count: 1 - Number(match[1]),
  132. noreps: {on: true, off: false}[match[3].replace(/.*,noreps=([^,]*).*/, '$1')],
  133. norts: {on: true, off: false}[match[3].replace(/.*,norts=([^,]*).*/, '$1')],
  134. }).then(tweets => tweets.slice(-1)[0].id_str);
  135. (match ? query() : Promise.resolve(idOrQuery))
  136. .then((id: string) => this.getTweet(
  137. id,
  138. this.sendTweets({sourceInfo: `tweet ${id}`, reportOnSkip: true, force: forceRefresh}, receiver),
  139. forceRefresh
  140. ))
  141. .catch((err: {code: number, message: string}[]) => {
  142. if (err[0]?.code === 34)
  143. return this.bot.sendTo(receiver, `找不到用户 ${match[2].replace(/^@?(.*)$/, '@$1')}。`);
  144. if (err[0].code !== 144) {
  145. logger.warn(`error retrieving tweet: ${err[0].message}`);
  146. this.bot.sendTo(receiver, `获取推文时出现错误:${err[0].message}`);
  147. }
  148. this.bot.sendTo(receiver, '找不到请求的推文,它可能已被删除。');
  149. });
  150. };
  151. sendTimeline = ({username, count, since, until, noreps, norts}, receiver) => {
  152. const countNum = Number(count) || 10;
  153. (countNum > 0 ? this.queryTimeline : this.queryTimelineReverse)({
  154. username,
  155. count: Math.abs(countNum),
  156. since: BigNumOps.parse(since) || snowflake(new Date(since).getTime()),
  157. until: BigNumOps.parse(until) || snowflake(new Date(until).getTime()),
  158. noreps: {on: true, off: false}[noreps],
  159. norts: {on: true, off: false}[norts],
  160. })
  161. .then(tweets => chainPromises(
  162. tweets.map(tweet => () => this.bot.sendTo(receiver, `\
  163. 编号:${tweet.id_str}
  164. 时间:${tweet.created_at}
  165. 媒体:${tweet.extended_entities ? '有' : '无'}
  166. 正文:\n${tweet.full_text.replace(/^([\s\S\n]{50})[\s\S\n]+?( https:\/\/t.co\/.*)?$/, '$1…$2')}`
  167. ))
  168. .concat(() => this.bot.sendTo(receiver, tweets.length ?
  169. '时间线查询完毕,使用 /twitter_view <编号> 查看推文详细内容。' :
  170. '时间线查询完毕,没有找到符合条件的推文。'
  171. ))
  172. ))
  173. .catch((err: {code: number, message: string}[]) => {
  174. if (err[0]?.code !== 34) {
  175. logger.warn(`error retrieving timeline: ${err[0]?.message || err}`);
  176. return this.bot.sendTo(receiver, `获取时间线时出现错误:${err[0]?.message || err}`);
  177. }
  178. this.bot.sendTo(receiver, `找不到用户 ${username.replace(/^@?(.*)$/, '@$1')}。`);
  179. });
  180. };
  181. }
  182. public launch = () => {
  183. this.webshot = new Webshot(
  184. this.wsUrl,
  185. this.mode,
  186. () => setTimeout(this.work, this.workInterval * 1000)
  187. );
  188. };
  189. public queryUser = (username: string) => this.client.get('users/show', {screen_name: username})
  190. .then((user: FullUser) => user.screen_name);
  191. public queryTimelineReverse = (conf: ITimelineQueryConfig) => {
  192. if (!conf.since) return this.queryTimeline(conf);
  193. const count = conf.count;
  194. const maxID = conf.until;
  195. conf.count = undefined;
  196. const until = () => BigNumOps.min(maxID, BigNumOps.plus(conf.since, String(7 * 24 * 3600 * 1000 * 2 ** 22)));
  197. conf.until = until();
  198. const promise = (tweets: ITweet[]): Promise<ITweet[]> =>this.queryTimeline(conf).then(newTweets => {
  199. tweets = newTweets.concat(tweets);
  200. conf.since = conf.until;
  201. conf.until = until();
  202. if (
  203. tweets.length >= count ||
  204. BigNumOps.compare(conf.since, conf.until) >= 0
  205. ) {
  206. return tweets.slice(-count);
  207. }
  208. return promise(tweets);
  209. });
  210. return promise([]);
  211. };
  212. public queryTimeline = (
  213. { username, count, since, until, noreps, norts }: ITimelineQueryConfig
  214. ) => {
  215. username = username.replace(/^@?(.*)$/, '@$1');
  216. logger.info(`querying timeline of ${username} with config: ${
  217. JSON.stringify({
  218. ...(count && {count}),
  219. ...(since && {since}),
  220. ...(until && {until}),
  221. ...(noreps && {noreps}),
  222. ...(norts && {norts}),
  223. })}`);
  224. const fetchTimeline = (
  225. config = {
  226. screen_name: username.slice(1),
  227. trim_user: true,
  228. exclude_replies: noreps ?? true,
  229. include_rts: !(norts ?? false),
  230. since_id: since,
  231. max_id: until,
  232. tweet_mode: 'extended',
  233. },
  234. tweets: ITweet[] = []
  235. ): Promise<ITweet[]> => this.client.get('statuses/user_timeline', config)
  236. .then((newTweets: ITweet[]) => {
  237. if (newTweets.length) {
  238. logger.debug(`fetched tweets: ${JSON.stringify(newTweets)}`);
  239. config.max_id = BigNumOps.plus('-1', newTweets[newTweets.length - 1].id_str);
  240. logger.info(`timeline query of ${username} yielded ${
  241. newTweets.length
  242. } new tweets, next query will start at offset ${config.max_id}`);
  243. tweets.push(...newTweets);
  244. }
  245. if (!newTweets.length || tweets.length >= count) {
  246. logger.info(`timeline query of ${username} finished successfully, ${
  247. tweets.length
  248. } tweets have been fetched`);
  249. return tweets.slice(0, count);
  250. }
  251. return fetchTimeline(config, tweets);
  252. });
  253. return fetchTimeline();
  254. };
  255. private workOnTweets = (
  256. tweets: Tweets,
  257. sendTweets: (id: string, msg: string, text: string, author: string) => void,
  258. refresh = false
  259. ) => Promise.all(tweets.map(tweet =>
  260. ((this.redis && !refresh) ? this.redis.getContent(`webshot/${tweet.id_str}`) : Promise.reject())
  261. .then(content => {
  262. if (content === null) throw Error();
  263. logger.info(`retrieved cached webshot of tweet ${tweet.id_str} from redis database`);
  264. const {msg, text, author} = JSON.parse(content) as {[key: string]: string};
  265. sendTweets(tweet.retweeted_status ? tweet.retweeted_status.id_str : tweet.id_str, msg, text, author);
  266. }).catch(() =>
  267. this.webshot([tweet], (id: string, msg: string, text: string, author: string) => {
  268. Promise.resolve()
  269. .then(() => {
  270. if (!this.redis) return;
  271. logger.info(`caching webshot of tweet ${tweet.id_str} to redis database`);
  272. this.redis.cacheContent(`webshot/${tweet.id_str}`, JSON.stringify({msg, text, author}));
  273. })
  274. .then(() => sendTweets(id, msg, text, author));
  275. }, this.webshotDelay)
  276. )
  277. ));
  278. public getTweet = (id: string, sender: (id: string, msg: string, text: string, author: string) => void, refresh = false) => {
  279. const endpoint = 'statuses/show';
  280. const config = {
  281. id,
  282. tweet_mode: 'extended',
  283. };
  284. return this.client.get(endpoint, config)
  285. .then((tweet: Tweet) => {
  286. logger.debug(`api returned tweet ${JSON.stringify(tweet)} for query id=${id}`);
  287. return this.workOnTweets([tweet], sender, refresh);
  288. });
  289. };
  290. private sendTweets = (
  291. config: {sourceInfo?: string, reportOnSkip?: boolean, force?: boolean} = {reportOnSkip: false, force: false},
  292. ...to: IChat[]
  293. ) => (id: string, msg: string, text: string, author: string) => {
  294. to.forEach(subscriber => {
  295. const {sourceInfo: source, reportOnSkip, force} = config;
  296. const targetStr = JSON.stringify(subscriber);
  297. const send = () => retryOnError(
  298. () => this.bot.sendTo(subscriber, msg),
  299. (_, count, terminate: (doNothing: Promise<void>) => void) => {
  300. if (count <= maxTrials) {
  301. logger.warn(`retry sending to ${subscriber.chatID} for the ${ordinal(count)} time...`);
  302. } else {
  303. logger.warn(`${count - 1} consecutive failures while sending message chain, trying plain text instead...`);
  304. terminate(this.bot.sendTo(subscriber, author + text, true));
  305. }
  306. }
  307. ).then(() => {
  308. if (this.redis) {
  309. logger.info(`caching push status of this tweet (or its origin in case of a retweet) for ${targetStr}...`);
  310. return this.redis.cacheForChat(id, subscriber);
  311. }
  312. });
  313. ((this.redis && !force) ? this.redis.isCachedForChat(id, subscriber) : Promise.resolve(false))
  314. .then(isCached => {
  315. if (isCached) {
  316. logger.info(`skipped subscriber ${targetStr} as this tweet or the origin of this retweet has been sent already`);
  317. if (!reportOnSkip) return;
  318. text = `[最近发送过的推文:${id}]`;
  319. msg = author + text;
  320. }
  321. logger.info(`pushing data${source ? ` of ${source}` : ''} to ${targetStr}`);
  322. return send();
  323. });
  324. });
  325. };
  326. public work = () => {
  327. const lock = this.lock;
  328. if (this.workInterval < 1) this.workInterval = 1;
  329. if (lock.feed.length === 0) {
  330. setTimeout(() => {
  331. this.work();
  332. }, this.workInterval * 1000);
  333. return;
  334. }
  335. if (lock.workon >= lock.feed.length) lock.workon = 0;
  336. if (!lock.threads[lock.feed[lock.workon]] ||
  337. !lock.threads[lock.feed[lock.workon]].subscribers ||
  338. lock.threads[lock.feed[lock.workon]].subscribers.length === 0) {
  339. logger.warn(`nobody subscribes thread ${lock.feed[lock.workon]}, removing from feed`);
  340. delete lock.threads[lock.feed[lock.workon]];
  341. lock.feed.splice(lock.workon, 1);
  342. fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
  343. this.work();
  344. return;
  345. }
  346. const currentFeed = lock.feed[lock.workon];
  347. logger.debug(`pulling feed ${currentFeed}`);
  348. const promise = new Promise(resolve => {
  349. let match = /https:\/\/twitter.com\/([^\/]+)\/lists\/([^\/]+)/.exec(currentFeed);
  350. let config: {[key: string]: any};
  351. let endpoint: string;
  352. if (match) {
  353. if (match[1] === 'i') {
  354. config = {
  355. list_id: match[2],
  356. tweet_mode: 'extended',
  357. };
  358. } else {
  359. config = {
  360. owner_screen_name: match[1],
  361. slug: match[2],
  362. tweet_mode: 'extended',
  363. };
  364. }
  365. endpoint = 'lists/statuses';
  366. } else {
  367. match = /https:\/\/twitter.com\/([^\/]+)/.exec(currentFeed);
  368. if (match) {
  369. config = {
  370. screen_name: match[1],
  371. exclude_replies: false,
  372. tweet_mode: 'extended',
  373. };
  374. endpoint = 'statuses/user_timeline';
  375. }
  376. }
  377. if (endpoint) {
  378. const offset = lock.threads[currentFeed].offset;
  379. if (offset as unknown as number > 0) config.since_id = offset;
  380. const getMore = (gotTweets: Tweets = []) => this.client.get(
  381. endpoint, config, (error: {[key: string]: any}[], tweets: Tweets
  382. ) => {
  383. if (error) {
  384. if (error instanceof Array && error.length > 0 && error[0].code === 34) {
  385. logger.warn(`error on fetching tweets for ${currentFeed}: ${JSON.stringify(error)}`);
  386. lock.threads[currentFeed].subscribers.forEach(subscriber => {
  387. logger.info(`sending notfound message of ${currentFeed} to ${JSON.stringify(subscriber)}`);
  388. this.bot.sendTo(subscriber, `链接 ${currentFeed} 指向的用户或列表不存在,请退订。`).catch();
  389. });
  390. } else {
  391. logger.error(`unhandled error on fetching tweets for ${currentFeed}: ${JSON.stringify(error)}`);
  392. }
  393. }
  394. if (!tweets || tweets.length <= 1) return resolve(gotTweets);
  395. config.max_id = tweets.slice(-1)[0].id_str;
  396. getMore(gotTweets.concat(tweets));
  397. });
  398. getMore();
  399. }
  400. });
  401. promise.then((tweets: Tweets) => {
  402. logger.debug(`api returned ${JSON.stringify(tweets)} for feed ${currentFeed}`);
  403. const currentThread = lock.threads[currentFeed];
  404. const updateDate = () => currentThread.updatedAt = new Date().toString();
  405. if (!tweets || tweets.length === 0) { updateDate(); return; }
  406. const topOfFeed = tweets[0].id_str;
  407. const updateOffset = () => currentThread.offset = topOfFeed;
  408. if (currentThread.offset === '-1') { updateOffset(); return; }
  409. if (currentThread.offset === '0') tweets.splice(1);
  410. return this.workOnTweets(tweets, this.sendTweets({sourceInfo: `thread ${currentFeed}`}, ...currentThread.subscribers))
  411. .then(updateDate).then(updateOffset);
  412. })
  413. .then(() => {
  414. lock.workon++;
  415. let timeout = this.workInterval * 1000 / lock.feed.length;
  416. if (timeout < 1000) timeout = 1000;
  417. fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
  418. setTimeout(() => {
  419. this.work();
  420. }, timeout);
  421. });
  422. };
  423. }