import * as fs from 'fs'; import * as path from 'path'; import * as Twitter from 'twitter'; import TwitterTypes from 'twitter-d'; import { getLogger } from './loggers'; import QQBot from './koishi'; import RedisSvc from './redis'; import { chainPromises, BigNumOps } from './utils'; import Webshot from './webshot'; interface IWorkerOption { lock: ILock; lockfile: string; bot: QQBot; workInterval: number; webshotDelay: number; consumerKey: string; consumerSecret: string; accessTokenKey: string; accessTokenSecret: string; mode: number; wsUrl: string; redis?: IRedisConfig; } export class ScreenNameNormalizer { // tslint:disable-next-line: variable-name public static _queryUser: (username: string) => Promise; public static normalize = (username: string) => username.toLowerCase().replace(/^@/, ''); public static async normalizeLive(username: string) { if (this._queryUser) { return await this._queryUser(username) .catch((err: {code: number, message: string}[]) => { if (err[0].code !== 50) { logger.warn(`error looking up user: ${err[0].message}`); return username; } return null; }); } return this.normalize(username); } } export let sendTweet = (id: string, receiver: IChat, forceRefresh: boolean): void => { throw Error(); }; export interface ITimelineQueryConfig { username: string; count?: number; since?: string; until?: string; noreps?: boolean; norts?: boolean; } export let sendTimeline = ( conf: {[key in keyof ITimelineQueryConfig]: string}, receiver: IChat ): void => { throw Error(); }; const TWITTER_EPOCH = 1288834974657; const snowflake = (epoch: number) => Number.isNaN(epoch) ? undefined : BigNumOps.lShift(String(epoch - 1 - TWITTER_EPOCH), 22); const logger = getLogger('twitter'); const maxTrials = 3; const retryInterval = 1500; const ordinal = (n: number) => { switch ((Math.trunc(n / 10) % 10 === 1) ? 0 : n % 10) { case 1: return `${n}st`; case 2: return `${n}nd`; case 3: return `${n}rd`; default: return `${n}th`; } }; const retryOnError = ( doWork: () => Promise, onRetry: (error, count: number, terminate: (defaultValue: U) => void) => void ) => new Promise(resolve => { const retry = (reason, count: number) => { setTimeout(() => { let terminate = false; onRetry(reason, count, defaultValue => { terminate = true; resolve(defaultValue); }); if (!terminate) doWork().then(resolve).catch(error => retry(error, count + 1)); }, retryInterval); }; doWork().then(resolve).catch(error => retry(error, 1)); }); export type FullUser = TwitterTypes.FullUser; export type Entities = TwitterTypes.Entities; export type ExtendedEntities = TwitterTypes.ExtendedEntities; export type MediaEntity = TwitterTypes.MediaEntity; interface ITweet extends TwitterTypes.Status { user: FullUser; retweeted_status?: Tweet; } export type Tweet = ITweet; export type Tweets = ITweet[]; export default class { private client: Twitter; private lock: ILock; private lockfile: string; private workInterval: number; private bot: QQBot; private webshotDelay: number; private webshot: Webshot; private mode: number; private wsUrl: string; private redis: RedisSvc; constructor(opt: IWorkerOption) { this.client = new Twitter({ consumer_key: opt.consumerKey, consumer_secret: opt.consumerSecret, access_token_key: opt.accessTokenKey, access_token_secret: opt.accessTokenSecret, }); this.lockfile = opt.lockfile; this.lock = opt.lock; this.workInterval = opt.workInterval; this.bot = opt.bot; this.webshotDelay = opt.webshotDelay; this.mode = opt.mode; this.wsUrl = opt.wsUrl; if (opt.redis) this.redis = new RedisSvc(opt.redis); ScreenNameNormalizer._queryUser = this.queryUser; sendTweet = (idOrQuery, receiver, forceRefresh) => { const match = /^last(|-\d+)@([^\/?#,]+)((?:,no.*?=[^,]*)*)$/.exec(idOrQuery); const query = () => this.queryTimeline({ username: match[2], count: 1 - Number(match[1]), noreps: {on: true, off: false}[match[3].replace(/.*,noreps=([^,]*).*/, '$1')], norts: {on: true, off: false}[match[3].replace(/.*,norts=([^,]*).*/, '$1')], }).then(tweets => tweets.slice(-1)[0].id_str); (match ? query() : Promise.resolve(idOrQuery)) .then((id: string) => this.getTweet( id, this.sendTweets({sourceInfo: `tweet ${id}`, reportOnSkip: true, force: forceRefresh}, receiver), forceRefresh )) .catch((err: {code: number, message: string}[]) => { if (err[0]?.code === 34) return this.bot.sendTo(receiver, `找不到用户 ${match[2].replace(/^@?(.*)$/, '@$1')}。`); if (err[0].code !== 144) { logger.warn(`error retrieving tweet: ${err[0].message}`); this.bot.sendTo(receiver, `获取推文时出现错误:${err[0].message}`); } this.bot.sendTo(receiver, '找不到请求的推文,它可能已被删除。'); }); }; sendTimeline = ({username, count, since, until, noreps, norts}, receiver) => { const countNum = Number(count) || 10; (countNum > 0 ? this.queryTimeline : this.queryTimelineReverse)({ username, count: Math.abs(countNum), since: BigNumOps.parse(since) || snowflake(new Date(since).getTime()), until: BigNumOps.parse(until) || snowflake(new Date(until).getTime()), noreps: {on: true, off: false}[noreps], norts: {on: true, off: false}[norts], }) .then(tweets => chainPromises( tweets.map(tweet => () => this.bot.sendTo(receiver, `\ 编号:${tweet.id_str} 时间:${tweet.created_at} 媒体:${tweet.extended_entities ? '有' : '无'} 正文:\n${tweet.full_text.replace(/^([\s\S\n]{50})[\s\S\n]+?( https:\/\/t.co\/.*)?$/, '$1…$2')}` )) .concat(() => this.bot.sendTo(receiver, tweets.length ? '时间线查询完毕,使用 /twitter_view <编号> 查看推文详细内容。' : '时间线查询完毕,没有找到符合条件的推文。' )) )) .catch((err: {code: number, message: string}[]) => { if (err[0]?.code !== 34) { logger.warn(`error retrieving timeline: ${err[0]?.message || err}`); return this.bot.sendTo(receiver, `获取时间线时出现错误:${err[0]?.message || err}`); } this.bot.sendTo(receiver, `找不到用户 ${username.replace(/^@?(.*)$/, '@$1')}。`); }); }; } public launch = () => { this.webshot = new Webshot( this.wsUrl, this.mode, () => setTimeout(this.work, this.workInterval * 1000) ); }; public queryUser = (username: string) => this.client.get('users/show', {screen_name: username}) .then((user: FullUser) => user.screen_name); public queryTimelineReverse = (conf: ITimelineQueryConfig) => { if (!conf.since) return this.queryTimeline(conf); const count = conf.count; const maxID = conf.until; conf.count = undefined; const until = () => BigNumOps.min(maxID, BigNumOps.plus(conf.since, String(7 * 24 * 3600 * 1000 * 2 ** 22))); conf.until = until(); const promise = (tweets: ITweet[]): Promise =>this.queryTimeline(conf).then(newTweets => { tweets = newTweets.concat(tweets); conf.since = conf.until; conf.until = until(); if ( tweets.length >= count || BigNumOps.compare(conf.since, conf.until) >= 0 ) { return tweets.slice(-count); } return promise(tweets); }); return promise([]); }; public queryTimeline = ( { username, count, since, until, noreps, norts }: ITimelineQueryConfig ) => { username = username.replace(/^@?(.*)$/, '@$1'); logger.info(`querying timeline of ${username} with config: ${ JSON.stringify({ ...(count && {count}), ...(since && {since}), ...(until && {until}), ...(noreps && {noreps}), ...(norts && {norts}), })}`); const fetchTimeline = ( config = { screen_name: username.slice(1), trim_user: true, exclude_replies: noreps ?? true, include_rts: !(norts ?? false), since_id: since, max_id: until, tweet_mode: 'extended', }, tweets: ITweet[] = [] ): Promise => this.client.get('statuses/user_timeline', config) .then((newTweets: ITweet[]) => { if (newTweets.length) { logger.debug(`fetched tweets: ${JSON.stringify(newTweets)}`); config.max_id = BigNumOps.plus('-1', newTweets[newTweets.length - 1].id_str); logger.info(`timeline query of ${username} yielded ${ newTweets.length } new tweets, next query will start at offset ${config.max_id}`); tweets.push(...newTweets); } if (!newTweets.length || tweets.length >= count) { logger.info(`timeline query of ${username} finished successfully, ${ tweets.length } tweets have been fetched`); return tweets.slice(0, count); } return fetchTimeline(config, tweets); }); return fetchTimeline(); }; private workOnTweets = ( tweets: Tweets, sendTweets: (id: string, msg: string, text: string, author: string) => void, refresh = false ) => Promise.all(tweets.map(tweet => ((this.redis && !refresh) ? this.redis.getContent(`webshot/${tweet.id_str}`) : Promise.reject()) .then(content => { if (content === null) throw Error(); logger.info(`retrieved cached webshot of tweet ${tweet.id_str} from redis database`); const {msg, text, author} = JSON.parse(content) as {[key: string]: string}; sendTweets(tweet.retweeted_status ? tweet.retweeted_status.id_str : tweet.id_str, msg, text, author); }).catch(() => this.webshot([tweet], (id: string, msg: string, text: string, author: string) => { Promise.resolve() .then(() => { if (!this.redis) return; logger.info(`caching webshot of tweet ${tweet.id_str} to redis database`); this.redis.cacheContent(`webshot/${tweet.id_str}`, JSON.stringify({msg, text, author})); }) .then(() => sendTweets(id, msg, text, author)); }, this.webshotDelay) ) )); public getTweet = (id: string, sender: (id: string, msg: string, text: string, author: string) => void, refresh = false) => { const endpoint = 'statuses/show'; const config = { id, tweet_mode: 'extended', }; return this.client.get(endpoint, config) .then((tweet: Tweet) => { logger.debug(`api returned tweet ${JSON.stringify(tweet)} for query id=${id}`); return this.workOnTweets([tweet], sender, refresh); }); }; private sendTweets = ( config: {sourceInfo?: string, reportOnSkip?: boolean, force?: boolean} = {reportOnSkip: false, force: false}, ...to: IChat[] ) => (id: string, msg: string, text: string, author: string) => { to.forEach(subscriber => { const {sourceInfo: source, reportOnSkip, force} = config; const targetStr = JSON.stringify(subscriber); const send = () => retryOnError( () => this.bot.sendTo(subscriber, msg), (_, count, terminate: (doNothing: Promise) => void) => { if (count <= maxTrials) { logger.warn(`retry sending to ${subscriber.chatID} for the ${ordinal(count)} time...`); } else { logger.warn(`${count - 1} consecutive failures while sending message chain, trying plain text instead...`); terminate(this.bot.sendTo(subscriber, author + text, true)); } } ).then(() => { if (this.redis) { logger.info(`caching push status of this tweet (or its origin in case of a retweet) for ${targetStr}...`); return this.redis.cacheForChat(id, subscriber); } }); ((this.redis && !force) ? this.redis.isCachedForChat(id, subscriber) : Promise.resolve(false)) .then(isCached => { if (isCached) { logger.info(`skipped subscriber ${targetStr} as this tweet or the origin of this retweet has been sent already`); if (!reportOnSkip) return; text = `[最近发送过的推文:${id}]`; msg = author + text; } logger.info(`pushing data${source ? ` of ${source}` : ''} to ${targetStr}`); return send(); }); }); }; public work = () => { const lock = this.lock; if (this.workInterval < 1) this.workInterval = 1; if (lock.feed.length === 0) { setTimeout(() => { this.work(); }, this.workInterval * 1000); return; } if (lock.workon >= lock.feed.length) lock.workon = 0; if (!lock.threads[lock.feed[lock.workon]] || !lock.threads[lock.feed[lock.workon]].subscribers || lock.threads[lock.feed[lock.workon]].subscribers.length === 0) { logger.warn(`nobody subscribes thread ${lock.feed[lock.workon]}, removing from feed`); delete lock.threads[lock.feed[lock.workon]]; lock.feed.splice(lock.workon, 1); fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock)); this.work(); return; } const currentFeed = lock.feed[lock.workon]; logger.debug(`pulling feed ${currentFeed}`); const promise = new Promise(resolve => { let match = /https:\/\/twitter.com\/([^\/]+)\/lists\/([^\/]+)/.exec(currentFeed); let config: {[key: string]: any}; let endpoint: string; if (match) { if (match[1] === 'i') { config = { list_id: match[2], tweet_mode: 'extended', }; } else { config = { owner_screen_name: match[1], slug: match[2], tweet_mode: 'extended', }; } endpoint = 'lists/statuses'; } else { match = /https:\/\/twitter.com\/([^\/]+)/.exec(currentFeed); if (match) { config = { screen_name: match[1], exclude_replies: false, tweet_mode: 'extended', }; endpoint = 'statuses/user_timeline'; } } if (endpoint) { const offset = lock.threads[currentFeed].offset; if (offset as unknown as number > 0) config.since_id = offset; const getMore = (gotTweets: Tweets = []) => this.client.get( endpoint, config, (error: {[key: string]: any}[], tweets: Tweets ) => { if (error) { if (error instanceof Array && error.length > 0 && error[0].code === 34) { logger.warn(`error on fetching tweets for ${currentFeed}: ${JSON.stringify(error)}`); lock.threads[currentFeed].subscribers.forEach(subscriber => { logger.info(`sending notfound message of ${currentFeed} to ${JSON.stringify(subscriber)}`); this.bot.sendTo(subscriber, `链接 ${currentFeed} 指向的用户或列表不存在,请退订。`).catch(); }); } else { logger.error(`unhandled error on fetching tweets for ${currentFeed}: ${JSON.stringify(error)}`); } } if (!tweets || tweets.length <= 1) return resolve(gotTweets); config.max_id = tweets.slice(-1)[0].id_str; getMore(gotTweets.concat(tweets)); }); getMore(); } }); promise.then((tweets: Tweets) => { logger.debug(`api returned ${JSON.stringify(tweets)} for feed ${currentFeed}`); const currentThread = lock.threads[currentFeed]; const updateDate = () => currentThread.updatedAt = new Date().toString(); if (!tweets || tweets.length === 0) { updateDate(); return; } const topOfFeed = tweets[0].id_str; const updateOffset = () => currentThread.offset = topOfFeed; if (currentThread.offset === '-1') { updateOffset(); return; } if (currentThread.offset === '0') tweets.splice(1); return this.workOnTweets(tweets, this.sendTweets({sourceInfo: `thread ${currentFeed}`}, ...currentThread.subscribers)) .then(updateDate).then(updateOffset); }) .then(() => { lock.workon++; let timeout = this.workInterval * 1000 / lock.feed.length; if (timeout < 1000) timeout = 1000; fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock)); setTimeout(() => { this.work(); }, timeout); }); }; }