twitter.ts 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. import * as fs from 'fs';
  2. import * as path from 'path';
  3. import * as Twitter from 'twitter';
  4. import TwitterTypes from 'twitter-d';
  5. import { getLogger } from './loggers';
  6. import QQBot from './koishi';
  7. import RedisSvc from './redis';
  8. import { chainPromises, BigNumOps } from './utils';
  9. import Webshot from './webshot';
  10. interface IWorkerOption {
  11. lock: ILock;
  12. lockfile: string;
  13. bot: QQBot;
  14. workInterval: number;
  15. webshotDelay: number;
  16. consumerKey: string;
  17. consumerSecret: string;
  18. accessTokenKey: string;
  19. accessTokenSecret: string;
  20. mode: number;
  21. wsUrl: string;
  22. redis?: IRedisConfig;
  23. }
  24. export class ScreenNameNormalizer {
  25. // tslint:disable-next-line: variable-name
  26. public static _queryUser: (username: string) => Promise<string>;
  27. public static normalize = (username: string) => username.toLowerCase().replace(/^@/, '');
  28. public static async normalizeLive(username: string) {
  29. if (this._queryUser) {
  30. return await this._queryUser(username)
  31. .catch((err: {code: number, message: string}[]) => {
  32. if (err[0].code !== 50) {
  33. logger.warn(`error looking up user: ${err[0].message}`);
  34. return username;
  35. }
  36. return null;
  37. });
  38. }
  39. return this.normalize(username);
  40. }
  41. }
  42. export let sendTweet = (id: string, receiver: IChat, forceRefresh: boolean): void => {
  43. throw Error();
  44. };
  45. export interface ITimelineQueryConfig {
  46. username: string;
  47. count?: number;
  48. since?: string;
  49. until?: string;
  50. noreps?: boolean;
  51. norts?: boolean;
  52. }
  53. export let sendTimeline = (
  54. conf: {[key in keyof ITimelineQueryConfig]: string},
  55. receiver: IChat
  56. ): void => {
  57. throw Error();
  58. };
  59. const TWITTER_EPOCH = 1288834974657;
  60. const snowflake = (epoch: number) => Number.isNaN(epoch) ? undefined :
  61. BigNumOps.lShift(String(epoch - 1 - TWITTER_EPOCH), 22);
  62. const logger = getLogger('twitter');
  63. const maxTrials = 3;
  64. const retryInterval = 1500;
  65. const ordinal = (n: number) => {
  66. switch ((Math.trunc(n / 10) % 10 === 1) ? 0 : n % 10) {
  67. case 1:
  68. return `${n}st`;
  69. case 2:
  70. return `${n}nd`;
  71. case 3:
  72. return `${n}rd`;
  73. default:
  74. return `${n}th`;
  75. }
  76. };
  77. const retryOnError = <T, U>(
  78. doWork: () => Promise<T>,
  79. onRetry: (error, count: number, terminate: (defaultValue: U) => void) => void
  80. ) => new Promise<T | U>(resolve => {
  81. const retry = (reason, count: number) => {
  82. setTimeout(() => {
  83. let terminate = false;
  84. onRetry(reason, count, defaultValue => { terminate = true; resolve(defaultValue); });
  85. if (!terminate) doWork().then(resolve).catch(error => retry(error, count + 1));
  86. }, retryInterval);
  87. };
  88. doWork().then(resolve).catch(error => retry(error, 1));
  89. });
  90. export type FullUser = TwitterTypes.FullUser;
  91. export type Entities = TwitterTypes.Entities;
  92. export type ExtendedEntities = TwitterTypes.ExtendedEntities;
  93. export type MediaEntity = TwitterTypes.MediaEntity;
  94. export interface Tweet extends TwitterTypes.Status {
  95. user: FullUser;
  96. retweeted_status?: Tweet;
  97. }
  98. export default class {
  99. private client: Twitter;
  100. private lock: ILock;
  101. private lockfile: string;
  102. private workInterval: number;
  103. private bot: QQBot;
  104. private webshotDelay: number;
  105. private webshot: Webshot;
  106. private mode: number;
  107. private wsUrl: string;
  108. private redis: RedisSvc;
  109. constructor(opt: IWorkerOption) {
  110. this.client = new Twitter({
  111. consumer_key: opt.consumerKey,
  112. consumer_secret: opt.consumerSecret,
  113. access_token_key: opt.accessTokenKey,
  114. access_token_secret: opt.accessTokenSecret,
  115. });
  116. this.lockfile = opt.lockfile;
  117. this.lock = opt.lock;
  118. this.workInterval = opt.workInterval;
  119. this.bot = opt.bot;
  120. this.webshotDelay = opt.webshotDelay;
  121. this.mode = opt.mode;
  122. this.wsUrl = opt.wsUrl;
  123. if (opt.redis) this.redis = new RedisSvc(opt.redis);
  124. ScreenNameNormalizer._queryUser = this.queryUser;
  125. sendTweet = (idOrQuery, receiver, forceRefresh) => {
  126. const send = (id: string) => this.getTweet(
  127. id,
  128. this.sendTweets({sourceInfo: `tweet ${id}`, reportOnSkip: true, force: forceRefresh}, receiver),
  129. forceRefresh
  130. )
  131. .catch((err: {code: number, message: string}[]) => {
  132. if (err[0]?.code === 34)
  133. return this.bot.sendTo(receiver, `找不到用户 ${match[2].replace(/^@?(.*)$/, '@$1')}。`);
  134. if (err[0].code !== 144) {
  135. logger.warn(`error retrieving tweet: ${err[0].message}`);
  136. this.bot.sendTo(receiver, `获取推文时出现错误:${err[0].message}`);
  137. }
  138. this.bot.sendTo(receiver, '找不到请求的推文,它可能已被删除。');
  139. });
  140. const match = /^last(|-\d+)@([^\/?#,]+)((?:,no.*?=[^,]*)*)$/.exec(idOrQuery);
  141. const query = () => this.queryTimeline({
  142. username: match[2],
  143. count: 1 - Number(match[1]),
  144. noreps: {on: true, off: false}[match[3].replace(/.*,noreps=([^,]*).*/, '$1')],
  145. norts: {on: true, off: false}[match[3].replace(/.*,norts=([^,]*).*/, '$1')],
  146. }).then(tweets => tweets.slice(-1)[0].id_str);
  147. (match ? query() : Promise.resolve(idOrQuery)).then(send);
  148. };
  149. sendTimeline = ({username, count, since, until, noreps, norts}, receiver) => {
  150. const countNum = Number(count) || 10;
  151. (countNum > 0 ? this.queryTimeline : this.queryTimelineReverse)({
  152. username,
  153. count: Math.abs(countNum),
  154. since: BigNumOps.parse(since) || snowflake(new Date(since).getTime()),
  155. until: BigNumOps.parse(until) || snowflake(new Date(until).getTime()),
  156. noreps: {on: true, off: false}[noreps],
  157. norts: {on: true, off: false}[norts],
  158. })
  159. .then(tweets => chainPromises(
  160. tweets.map(tweet => () => this.bot.sendTo(receiver, `\
  161. 编号:${tweet.id_str}
  162. 时间:${tweet.created_at}
  163. 媒体:${tweet.extended_entities ? '有' : '无'}
  164. 正文:\n${tweet.full_text.replace(/^([\s\S\n]{50})[\s\S\n]+?( https:\/\/t.co\/.*)?$/, '$1…$2')}`
  165. ))
  166. .concat(() => this.bot.sendTo(receiver, tweets.length ?
  167. '时间线查询完毕,使用 /twitter_view <编号> 查看推文详细内容。' :
  168. '时间线查询完毕,没有找到符合条件的推文。'
  169. ))
  170. ))
  171. .catch((err: {code: number, message: string}[]) => {
  172. if (err[0]?.code !== 34) {
  173. logger.warn(`error retrieving timeline: ${err[0]?.message || err}`);
  174. return this.bot.sendTo(receiver, `获取时间线时出现错误:${err[0]?.message || err}`);
  175. }
  176. this.bot.sendTo(receiver, `找不到用户 ${username.replace(/^@?(.*)$/, '@$1')}。`);
  177. });
  178. };
  179. }
  180. public launch = () => {
  181. this.webshot = new Webshot(
  182. this.wsUrl,
  183. this.mode,
  184. () => setTimeout(this.work, this.workInterval * 1000)
  185. );
  186. };
  187. public queryUser = (username: string) => this.client.get('users/show', {screen_name: username})
  188. .then((user: FullUser) => user.screen_name);
  189. public queryTimelineReverse = (conf: ITimelineQueryConfig) => {
  190. if (!conf.since) return this.queryTimeline(conf);
  191. const count = conf.count;
  192. const maxID = conf.until;
  193. conf.count = undefined;
  194. const until = () =>
  195. BigNumOps.min(maxID, BigNumOps.plus(conf.since, String(7 * 24 * 3600 * 1000 * 2 ** 22)));
  196. conf.until = until();
  197. const promise = (tweets: Tweet[]): Promise<Tweet[]> =>
  198. this.queryTimeline(conf).then(newTweets => {
  199. tweets = newTweets.concat(tweets);
  200. conf.since = conf.until;
  201. conf.until = until();
  202. if (
  203. tweets.length >= count ||
  204. BigNumOps.compare(conf.since, conf.until) >= 0
  205. ) {
  206. return tweets.slice(-count);
  207. }
  208. return promise(tweets);
  209. });
  210. return promise([]);
  211. };
  212. public queryTimeline = (
  213. { username, count, since, until, noreps, norts }: ITimelineQueryConfig
  214. ) => {
  215. username = username.replace(/^@?(.*)$/, '@$1');
  216. logger.info(`querying timeline of ${username} with config: ${
  217. JSON.stringify({
  218. ...(count && {count}),
  219. ...(since && {since}),
  220. ...(until && {until}),
  221. ...(noreps && {noreps}),
  222. ...(norts && {norts}),
  223. })}`);
  224. const fetchTimeline = (
  225. config = {
  226. screen_name: username.slice(1),
  227. trim_user: true,
  228. exclude_replies: noreps ?? true,
  229. include_rts: !(norts ?? false),
  230. since_id: since,
  231. max_id: until,
  232. tweet_mode: 'extended',
  233. },
  234. tweets: Tweet[] = []
  235. ): Promise<Tweet[]> => this.client.get('statuses/user_timeline', config)
  236. .then((newTweets: Tweet[]) => {
  237. if (newTweets.length) {
  238. logger.debug(`fetched tweets: ${JSON.stringify(newTweets)}`);
  239. config.max_id = BigNumOps.plus('-1', newTweets[newTweets.length - 1].id_str);
  240. logger.info(`timeline query of ${username} yielded ${
  241. newTweets.length
  242. } new tweets, next query will start at offset ${config.max_id}`);
  243. tweets.push(...newTweets);
  244. }
  245. if (!newTweets.length || tweets.length >= count) {
  246. logger.info(`timeline query of ${username} finished successfully, ${
  247. tweets.length
  248. } tweets have been fetched`);
  249. return tweets.slice(0, count);
  250. }
  251. return fetchTimeline(config, tweets);
  252. });
  253. return fetchTimeline();
  254. };
  255. private workOnTweets = (
  256. tweets: Tweet[],
  257. sendTweets: (cacheId: string, msg: string, text: string, author: string) => void,
  258. refresh = false
  259. ) => Promise.all(tweets.map(tweet =>
  260. ((this.redis && !refresh) ?
  261. this.redis.waitForProcess(`webshot/${tweet.id_str}`, this.webshotDelay * 4)
  262. .then(() => this.redis.getContent(`webshot/${tweet.id_str}`)) :
  263. Promise.reject())
  264. .then(content => {
  265. if (content === null) throw Error();
  266. logger.info(`retrieved cached webshot of tweet ${tweet.id_str} from redis database`);
  267. const {msg, text, author} = JSON.parse(content) as {[key: string]: string};
  268. let cacheId = tweet.id_str;
  269. if (tweet.retweeted_status) cacheId += `,rt:${tweet.retweeted_status.id_str}`;
  270. sendTweets(cacheId, msg, text, author);
  271. return null as Tweet;
  272. })
  273. .catch(() => {
  274. this.redis.startProcess(`webshot/${tweet.id_str}`);
  275. return tweet;
  276. })
  277. )).then(tweets =>
  278. this.webshot(
  279. tweets.filter(t => t),
  280. (cacheId: string, msg: string, text: string, author: string) => {
  281. Promise.resolve()
  282. .then(() => {
  283. if (!this.redis) return;
  284. const [twid, rtid] = cacheId.split(',rt:');
  285. logger.info(`caching webshot of tweet ${twid} to redis database`);
  286. this.redis.cacheContent(`webshot/${twid}`, JSON.stringify({msg, text, author, rtid}))
  287. .then(() => this.redis.finishProcess(`webshot/${twid}`));
  288. })
  289. .then(() => sendTweets(cacheId, msg, text, author));
  290. },
  291. this.webshotDelay
  292. )
  293. );
  294. public getTweet = (
  295. id: string,
  296. sender: (cacheId: string, msg: string, text: string, author: string) => void,
  297. refresh = false
  298. ) => {
  299. const endpoint = 'statuses/show';
  300. const config = {
  301. id,
  302. tweet_mode: 'extended',
  303. };
  304. return ((this.redis && !refresh) ?
  305. this.redis.waitForProcess(`webshot/${id}`, this.webshotDelay * 4)
  306. .then(() => this.redis.getContent(`webshot/${id}`))
  307. .then(content => {
  308. if (content === null) throw Error();
  309. const {rtid} = JSON.parse(content);
  310. return {id_str: id, retweeted_status: rtid ? {id_str: rtid} : undefined} as Tweet;
  311. }) :
  312. Promise.reject())
  313. .catch(() => this.client.get(endpoint, config))
  314. .then((tweet: Tweet) => {
  315. if (tweet.id) {
  316. logger.debug(`api returned tweet ${JSON.stringify(tweet)} for query id=${id}`);
  317. } else {
  318. logger.debug(`skipped querying api as this tweet has been cached`)
  319. }
  320. return this.workOnTweets([tweet], sender, refresh);
  321. });
  322. };
  323. private sendTweets = (
  324. config: {sourceInfo?: string, reportOnSkip?: boolean, force?: boolean}
  325. = {reportOnSkip: false, force: false},
  326. ...to: IChat[]
  327. ) => (id: string, msg: string, text: string, author: string) => {
  328. to.forEach(subscriber => {
  329. const [twid, rtid] = id.split(',rt:');
  330. const {sourceInfo: source, reportOnSkip, force} = config;
  331. const targetStr = JSON.stringify(subscriber);
  332. const send = () => retryOnError(
  333. () => this.bot.sendTo(subscriber, msg),
  334. (_, count, terminate: (doNothing: Promise<void>) => void) => {
  335. if (count <= maxTrials) {
  336. logger.warn(`retry sending to ${subscriber.chatID} for the ${ordinal(count)} time...`);
  337. } else {
  338. logger.warn(`${count - 1} consecutive failures while sending message chain, trying plain text instead...`);
  339. terminate(this.bot.sendTo(subscriber, author + text, true));
  340. }
  341. }
  342. ).then(() => {
  343. if (this.redis) {
  344. logger.info(`caching push status of tweet ${rtid ? `${rtid} (RTed as ${twid})` : twid} for ${targetStr}...`);
  345. return this.redis.cacheForChat(rtid || twid, subscriber);
  346. }
  347. });
  348. ((this.redis && !force) ? this.redis.isCachedForChat(rtid || twid, subscriber) : Promise.resolve(false))
  349. .then(isCached => {
  350. if (isCached) {
  351. logger.info(`skipped subscriber ${targetStr} as tweet ${rtid ? `${rtid} (or its RT)` : twid} has been sent already`);
  352. if (!reportOnSkip) return;
  353. text = `[最近发送过的推文:${rtid || twid}]`;
  354. msg = author + text;
  355. }
  356. logger.info(`pushing data${source ? ` of ${source}` : ''} to ${targetStr}`);
  357. return send();
  358. });
  359. });
  360. };
  361. public work = () => {
  362. const lock = this.lock;
  363. if (this.workInterval < 1) this.workInterval = 1;
  364. if (lock.feed.length === 0) {
  365. setTimeout(() => {
  366. this.work();
  367. }, this.workInterval * 1000);
  368. return;
  369. }
  370. if (lock.workon >= lock.feed.length) lock.workon = 0;
  371. if (!lock.threads[lock.feed[lock.workon]] ||
  372. !lock.threads[lock.feed[lock.workon]].subscribers ||
  373. lock.threads[lock.feed[lock.workon]].subscribers.length === 0) {
  374. logger.warn(`nobody subscribes thread ${lock.feed[lock.workon]}, removing from feed`);
  375. delete lock.threads[lock.feed[lock.workon]];
  376. lock.feed.splice(lock.workon, 1);
  377. fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
  378. this.work();
  379. return;
  380. }
  381. const currentFeed = lock.feed[lock.workon];
  382. logger.debug(`pulling feed ${currentFeed}`);
  383. const promise = new Promise(resolve => {
  384. let match = /https:\/\/twitter.com\/([^\/]+)\/lists\/([^\/]+)/.exec(currentFeed);
  385. let config: {[key: string]: any};
  386. let endpoint: string;
  387. if (match) {
  388. if (match[1] === 'i') {
  389. config = {
  390. list_id: match[2],
  391. tweet_mode: 'extended',
  392. };
  393. } else {
  394. config = {
  395. owner_screen_name: match[1],
  396. slug: match[2],
  397. tweet_mode: 'extended',
  398. };
  399. }
  400. endpoint = 'lists/statuses';
  401. } else {
  402. match = /https:\/\/twitter.com\/([^\/]+)/.exec(currentFeed);
  403. if (match) {
  404. config = {
  405. screen_name: match[1],
  406. exclude_replies: false,
  407. tweet_mode: 'extended',
  408. };
  409. endpoint = 'statuses/user_timeline';
  410. }
  411. }
  412. if (endpoint) {
  413. const offset = lock.threads[currentFeed].offset;
  414. if (offset as unknown as number > 0) config.since_id = offset;
  415. const getMore = (gotTweets: Tweet[] = []) => this.client.get(
  416. endpoint, config, (error: {[key: string]: any}[], tweets: Tweet[]
  417. ) => {
  418. if (error) {
  419. if (error instanceof Array && error.length > 0 && error[0].code === 34) {
  420. logger.warn(`error on fetching tweets for ${currentFeed}: ${JSON.stringify(error)}`);
  421. lock.threads[currentFeed].subscribers.forEach(subscriber => {
  422. logger.info(`sending notfound message of ${currentFeed} to ${JSON.stringify(subscriber)}`);
  423. this.bot.sendTo(subscriber, `链接 ${currentFeed} 指向的用户或列表不存在,请退订。`).catch();
  424. });
  425. } else {
  426. logger.error(`unhandled error on fetching tweets for ${currentFeed}: ${JSON.stringify(error)}`);
  427. }
  428. }
  429. if (!tweets || tweets.length <= 1) return resolve(gotTweets);
  430. config.max_id = tweets.slice(-1)[0].id_str;
  431. getMore(gotTweets.concat(tweets));
  432. });
  433. getMore();
  434. }
  435. });
  436. promise.then((tweets: Tweet[]) => {
  437. logger.debug(`api returned ${JSON.stringify(tweets)} for feed ${currentFeed}`);
  438. const currentThread = lock.threads[currentFeed];
  439. const updateDate = () => currentThread.updatedAt = new Date().toString();
  440. if (!tweets || tweets.length === 0) { updateDate(); return; }
  441. const topOfFeed = tweets[0].id_str;
  442. const updateOffset = () => currentThread.offset = topOfFeed;
  443. if (currentThread.offset === '-1') { updateOffset(); return; }
  444. if (currentThread.offset === '0') tweets.splice(1);
  445. return this.workOnTweets(tweets, this.sendTweets({sourceInfo: `thread ${currentFeed}`}, ...currentThread.subscribers))
  446. .then(updateDate).then(updateOffset);
  447. })
  448. .then(() => {
  449. lock.workon++;
  450. let timeout = this.workInterval * 1000 / lock.feed.length;
  451. if (timeout < 1000) timeout = 1000;
  452. fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
  453. setTimeout(() => {
  454. this.work();
  455. }, timeout);
  456. });
  457. };
  458. }