import * as fs from 'fs'; import * as path from 'path'; import * as Twitter from 'twitter-api-v2'; import { getLogger } from './loggers'; import QQBot, { Message } from './koishi'; import RedisSvc from './redis'; import { chainPromises, BigNumOps } from './utils'; import Webshot from './webshot'; interface IWorkerOption { lock: ILock; lockfile: string; bot: QQBot; workInterval: number; webshotDelay: number; consumerKey: string; consumerSecret: string; accessTokenKey?: string; accessTokenSecret?: string; mode: number; wsUrl: string; redis?: IRedisConfig; } export const parseLink = (link: string): string[] => { let match = /twitter.com\/([^\/?#]+)\/lists\/([^\/?#]+)/.exec(link) || /^([^\/?#]+)\/([^\/?#]+)$/.exec(link); if (match) return [match[1], `/lists/${match[2]}`]; match = /twitter.com\/([^\/?#]+)\/status\/(\d+)/.exec(link); if (match) return [match[1], `/status/${match[2]}`]; match = /twitter.com\/([^\/?#]+)/.exec(link) || /^([^\/?#]+)$/.exec(link); if (match) return [match[1]]; return; } export const linkBuilder = (userName: string, more = ''): string => { if (!userName) return; return `https://twitter.com/${userName}${more}`; } export class ScreenNameNormalizer { // tslint:disable-next-line: variable-name public static _queryUser: (username: string) => Promise; public static normalize = (username: string) => username.toLowerCase().replace(/^@/, ''); public static async normalizeLive(username: string) { username = this.normalize(username); if (this._queryUser) { return await this._queryUser(username) .then(userNameId => userNameId.split(':')[0]) .catch((err: Twitter.InlineErrorV2) => { if (err.title === 'Not Found Error') { logger.warn(`error looking up user: ${showApiError(err)}`); return username; } return null; }); } return username; } } export let sendTweet = (id: string, receiver: IChat, forceRefresh: boolean): void => { throw Error(); }; export interface ITimelineQueryConfig { username: string; count?: number; since?: string; until?: string; noreps?: boolean; norts?: boolean; } export let sendTimeline = ( conf: {[key in keyof ITimelineQueryConfig]: string}, receiver: IChat ): void => { throw Error(); }; const TWITTER_EPOCH = 1288834974657; const snowflake = (epoch: number) => Number.isNaN(epoch) ? undefined : BigNumOps.lShift(String(epoch - 1 - TWITTER_EPOCH), 22); const logger = getLogger('twitter'); const maxTrials = 3; const retryInterval = 1500; const ordinal = (n: number) => { switch ((Math.trunc(n / 10) % 10 === 1) ? 0 : n % 10) { case 1: return `${n}st`; case 2: return `${n}nd`; case 3: return `${n}rd`; default: return `${n}th`; } }; const retryOnError = ( doWork: () => Promise, onRetry: (error, count: number, terminate: (defaultValue: U) => void) => void ) => new Promise(resolve => { const retry = (reason, count: number) => { setTimeout(() => { let terminate = false; onRetry(reason, count, defaultValue => { terminate = true; resolve(defaultValue); }); if (!terminate) doWork().then(resolve).catch(error => retry(error, count + 1)); }, retryInterval); }; doWork().then(resolve).catch(error => retry(error, 1)); }); const showApiError = (err: Partial) => err.errors && err.errors[0].message || err.detail || err.stack || JSON.stringify(err); const toMutableConst = (o: T) => { // credits: https://stackoverflow.com/a/60493166 type DeepMutableArrays = (T extends object ? { [K in keyof T]: DeepMutableArrays } : T) extends infer O ? O extends ReadonlyArray ? { -readonly [K in keyof O]: O[K] } : O : never; return o as DeepMutableArrays }; const v2SingleParams = toMutableConst({ expansions: ['attachments.media_keys', 'author_id', 'referenced_tweets.id'], 'tweet.fields': ['created_at', 'entities'], 'media.fields': ['url', 'variants', 'alt_text'], 'user.fields': ['id', 'name', 'username'] } as const); type PickRequired = Pick, K> & T; export type TweetObject = PickRequired< Twitter.TweetV2SingleResult['data'], typeof v2SingleParams['tweet.fields'][number] >; export type UserObject = PickRequired< Twitter.TweetV2SingleResult['includes']['users'][number], typeof v2SingleParams['user.fields'][number] >; export type MediaObject = Twitter.TweetV2SingleResult['includes']['media'][number] & ( {type: 'video' | 'animated_gif', variants: Twitter.MediaVariantsV2[]} | {type: 'photo', url: string} ); export interface Tweet extends Twitter.TweetV2SingleResult { data: TweetObject, includes: { media: MediaObject[], users: UserObject[], } }; export default class { private client: Twitter.TwitterApiReadOnly; private lock: ILock; private lockfile: string; private workInterval: number; private bot: QQBot; private webshotDelay: number; private webshot: Webshot; private mode: number; private wsUrl: string; private redis: RedisSvc; constructor(opt: IWorkerOption) { this.client = new Twitter.TwitterApi({ appKey: opt.consumerKey, appSecret: opt.consumerSecret, }).readOnly; this.lockfile = opt.lockfile; this.lock = opt.lock; this.workInterval = opt.workInterval; this.bot = opt.bot; this.webshotDelay = opt.webshotDelay; this.mode = opt.mode; this.wsUrl = opt.wsUrl; if (opt.redis) this.redis = new RedisSvc(opt.redis); ScreenNameNormalizer._queryUser = this.queryUser; sendTweet = (idOrQuery, receiver, forceRefresh) => { const match = /^last(|-\d+)@([^\/?#,]+)((?:,no.*?=[^,]*)*)$/.exec(idOrQuery); const query = () => this.queryTimeline({ username: match[2], count: 1 - Number(match[1]), noreps: {on: true, off: false}[match[3].replace(/.*,noreps=([^,]*).*/, '$1')], norts: {on: true, off: false}[match[3].replace(/.*,norts=([^,]*).*/, '$1')], }).then(tweets => tweets.slice(-1)[0].data.id); (match ? query() : Promise.resolve(idOrQuery)) .then((id: string) => this.getTweet( id, this.sendTweets({sourceInfo: `tweet ${id}`, reportOnSkip: true, force: forceRefresh}, receiver), forceRefresh )) .catch((err: Twitter.InlineErrorV2) => { if (err.title !== 'Not Found Error') { logger.warn(`error retrieving tweet: ${showApiError(err)}`); this.bot.sendTo(receiver, `获取推文时出现错误:${showApiError(err)}`); } if (err.resource_type === 'user') { return this.bot.sendTo(receiver, `找不到用户 ${match[2].replace(/^@?(.*)$/, '@$1')}。`); } this.bot.sendTo(receiver, '找不到请求的推文,它可能已被删除。'); }); }; sendTimeline = ({username, count, since, until, noreps, norts}, receiver) => { const countNum = Number(count) || 10; (countNum > 0 ? this.queryTimeline : this.queryTimelineReverse)({ username, count: Math.abs(countNum), since: BigNumOps.parse(since) || snowflake(new Date(since).getTime()), until: BigNumOps.parse(until) || snowflake(new Date(until).getTime()), noreps: {on: true, off: false}[noreps], norts: {on: true, off: false}[norts], }) .then(tweets => chainPromises( tweets.map(({data}) => () => this.bot.sendTo(receiver, `\ 编号:${data.id} 时间:${data.created_at} 媒体:${(data.attachments || {}).media_keys ? '有' : '无'} 正文:\n${data.text.replace(/^([\s\S\n]{50})[\s\S\n]+?( https:\/\/t.co\/.*)?$/, '$1…$2')}` )) .concat(() => this.bot.sendTo(receiver, tweets.length ? '时间线查询完毕,使用 /twipic_view <编号> 查看媒体推文详细内容。' : '时间线查询完毕,没有找到符合条件的媒体推文。' )) )) .catch((err: Twitter.InlineErrorV2) => { if (err.title !== 'Not Found Error') { logger.warn(`error retrieving timeline: ${showApiError(err)}`); return this.bot.sendTo(receiver, `获取时间线时出现错误:${showApiError(err)}`); } this.bot.sendTo(receiver, `找不到用户 ${username.replace(/^@?(.*)$/, '@$1')}。`); }); }; } public launch = () => { this.client.appLogin().then(client => { this.client = client.readOnly; this.webshot = new Webshot( this.wsUrl, this.mode, () => setTimeout(this.work, this.workInterval * 1000) ); }); }; public queryUser = (username: string) => { const thread = this.lock.threads[linkBuilder(username)]; if (thread && thread.id) return Promise.resolve(`${username}:${thread.id}`); return this.client.v2.userByUsername(username).then(({data: {username, id}, errors}) => { if (errors && errors.length > 0) throw errors[0]; if (thread) thread.id = id; return `${username}:${id}`; }) } public queryTimelineReverse = (conf: ITimelineQueryConfig) => { if (!conf.since) return this.queryTimeline(conf); const count = conf.count; const maxID = conf.until; conf.count = undefined; const until = () => BigNumOps.min(maxID, BigNumOps.plus(conf.since, String(7 * 24 * 3600 * 1000 * 2 ** 22))); conf.until = until(); const promise = (tweets: Tweet[]): Promise => this.queryTimeline(conf).then(newTweets => { tweets = newTweets.concat(tweets); conf.since = conf.until; conf.until = until(); if ( tweets.length >= count || BigNumOps.compare(conf.since, conf.until) >= 0 ) { return tweets.slice(-count); } return promise(tweets); }); return promise([]); }; public queryTimeline = ( {username, count, since, until, noreps, norts}: ITimelineQueryConfig ) => { username = username.replace(/^@?(.*)$/, '@$1'); return this.queryUser(username.slice(1)).then(userNameId => { const getMore = (lastTweets: Tweet[] = []) => { logger.info(`querying timeline of ${username} with config: ${ JSON.stringify({ ...(count && {count}), ...(since && {since}), ...(until && {until}), ...(noreps && {noreps}), ...(norts && {norts}), })}`); return this.get('userTimeline', userNameId.split(':')[1], { expansions: ['attachments.media_keys', 'author_id'], 'tweet.fields': ['created_at'], exclude: [ ...(noreps ?? true) ? ['replies' as const] : [], ...(norts ?? false) ? ['retweets' as const] : [], ], max_results: Math.min(Math.max(count || 0, 20), 100), ...(since && {since_id: since}), ...(until && {until_id: until}), }).then(newTweets => { logger.info(`timeline query of ${username} yielded ${newTweets.length} new tweets`); const tweets = lastTweets.concat(newTweets.filter(({data}) => (data.attachments || {}).media_keys)); if (tweets.length < count) { until = BigNumOps.plus('-1', newTweets.slice(-1)[0].data.id); logger.info(`starting next query at offset ${until}...`); return getMore(tweets); } logger.info(`timeline query of ${username} finished successfully, ${ tweets.length } media tweets have been fetched`); return tweets.slice(0, count); }); } return getMore(); }); }; private workOnTweets = ( tweets: Tweet[], sendTweets: (cacheId: string, msg: string, text: string, author: string) => void, refresh = false ) => Promise.all(tweets.map(({data, includes}) => ((this.redis && !refresh) ? this.redis.waitForProcess(`webshot/${data.id}`, this.webshotDelay * 4) .then(() => this.redis.getContent(`webshot/${data.id}`)) : Promise.reject()) .then(content => { if (content === null) throw Error(); logger.info(`retrieved cached webshot of tweet ${data.id} from redis database, message chain:`); const {msg, text, author} = JSON.parse(content) as {[key: string]: string}; let cacheId = data.id; const retweetRef = (data.referenced_tweets || []).find(ref => ref.type === 'retweeted'); if (retweetRef) cacheId += `,rt:${retweetRef.id}`; logger.info(JSON.stringify(Message.parseCQCode(msg))); sendTweets(cacheId, Message.parseCQCode(msg), text, author); return null as Tweet; }) .catch(() => { this.redis.startProcess(`webshot/${data.id}`); return {data, includes} as Tweet; }) )).then(tweets => this.webshot( tweets.filter(t => t), (cacheId: string, msg: string, text: string, author: string) => { Promise.resolve() .then(() => { if (!this.redis) return; const [twid, rtid] = cacheId.split(',rt:'); logger.info(`caching webshot of tweet ${twid} to redis database`); this.redis.cacheContent(`webshot/${twid}`, JSON.stringify({msg: Message.toCQCode(msg), text, author, rtid}) ).then(() => this.redis.finishProcess(`webshot/${twid}`)); }) .then(() => sendTweets(cacheId, msg, text, author)); }, this.webshotDelay ) ); private handleRetweet = (tweet: Tweet) => { const retweetRef = (tweet.data.referenced_tweets || []).find(ref => ref.type === 'retweeted'); if (retweetRef) return this.client.v2.singleTweet(retweetRef.id, v2SingleParams) .then(({data: {referenced_tweets}, includes: {media}}) => ({ ...tweet, data: { ...tweet.data, referenced_tweets: [retweetRef, ...(referenced_tweets || [])], }, includes: { ...tweet.includes, media } }) as Tweet); return Promise.resolve(tweet); }; public getTweet = ( id: string, sender: (cacheId: string, msg: string, text: string, author: string) => void, refresh = false ) => ((this.redis && !refresh) ? this.redis.waitForProcess(`webshot/${id}`, this.webshotDelay * 4) .then(() => this.redis.getContent(`webshot/${id}`)) .then(content => { if (content === null) throw Error(); const {rtid} = JSON.parse(content); return {data: {id, ...rtid && {referenced_tweets: [{type: 'retweeted', id: rtid}]}}} as Tweet; }) : Promise.reject() ) .catch(() => this.client.v2.singleTweet(id, v2SingleParams)) .then((tweet: Tweet) => { if (tweet.data.text) { logger.debug(`api returned tweet ${JSON.stringify(tweet)} for query id=${id}`); return this.handleRetweet(tweet); } else { logger.debug(`skipped querying api as this tweet has been cached`); } return tweet; }) .then((tweet: Tweet) => this.workOnTweets([tweet], sender, refresh)); private sendTweets = ( config: {sourceInfo?: string, reportOnSkip?: boolean, force?: boolean} = {reportOnSkip: false, force: false}, ...to: IChat[] ) => (id: string, msg: string, text: string, author: string) => { to.forEach(subscriber => { const [twid, rtid] = id.split(',rt:'); const {sourceInfo: source, reportOnSkip, force} = config; const targetStr = JSON.stringify(subscriber); const send = () => retryOnError( () => this.bot.sendTo(subscriber, msg), (_, count, terminate: (doNothing: Promise) => void) => { if (count <= maxTrials) { logger.warn(`retry sending to ${subscriber.chatID} for the ${ordinal(count)} time...`); } else { logger.warn(`${count - 1} consecutive failures while sending message chain, trying plain text instead...`); terminate(this.bot.sendTo(subscriber, author + text, true)); } } ).then(() => { if (this.redis) { logger.info(`caching push status of tweet ${rtid ? `${rtid} (RTed as ${twid})` : twid} for ${targetStr}...`); return this.redis.cacheForChat(rtid || twid, subscriber); } }); ((this.redis && !force) ? this.redis.isCachedForChat(rtid || twid, subscriber) : Promise.resolve(false)) .then(isCached => { if (isCached) { logger.info(`skipped subscriber ${targetStr} as tweet ${rtid ? `${rtid} (or its RT)` : twid} has been sent already`); if (!reportOnSkip) return; text = `[最近发送过的推文:${rtid || twid}]`; msg = author + text; } logger.info(`pushing data${source ? ` of ${source}` : ''} to ${targetStr}`); return send(); }); }); }; private get = ( type: T, targetId: string, params: Parameters[1] ) => { const {since_id, max_results} = (params as Twitter.TweetV2UserTimelineParams); const getMore = (res: Twitter.TweetUserTimelineV2Paginator | Twitter.TweetV2ListTweetsPaginator) => { if (res.errors && res.errors.length > 0) { const [err] = res.errors; if (!res.data) throw err; if (err.title === 'Authorization Error') { logger.warn(`non-fatal error while querying ${type} with id ${targetId}, error: ${err.detail}`); } } if (!res.meta.next_token || // at last page BigNumOps.compare(res.tweets.slice(-1)[0].id, since_id || '0') !== 1 || // at specified boundary !since_id && res.meta.result_count >= max_results // at specified max count ) return res; return res.fetchNext().then(getMore); }; if (type === 'listTweets') delete (params as any).since_id; return this.client.v2[type](targetId, params).then(getMore) .then(({includes, tweets}) => tweets.map((tweet): Tweet => ({ data: tweet as TweetObject, includes: { media: includes.medias(tweet) as MediaObject[], users: [includes.author(tweet)] } }) )) .then(tweets => Promise.all(tweets.map(this.handleRetweet))); }; public work = () => { const lock = this.lock; if (this.workInterval < 1) this.workInterval = 1; if (lock.feed.length === 0) { setTimeout(() => { this.work(); }, this.workInterval * 1000); return; } if (lock.workon >= lock.feed.length) lock.workon = 0; if (!lock.threads[lock.feed[lock.workon]] || !lock.threads[lock.feed[lock.workon]].subscribers || lock.threads[lock.feed[lock.workon]].subscribers.length === 0) { logger.warn(`nobody subscribes thread ${lock.feed[lock.workon]}, removing from feed`); delete lock.threads[lock.feed[lock.workon]]; lock.feed.splice(lock.workon, 1); fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock)); this.work(); return; } const currentFeed = lock.feed[lock.workon]; logger.debug(`pulling feed ${currentFeed}`); const promise = new Promise(resolve => { let job = Promise.resolve(); let id = lock.threads[currentFeed].id; let endpoint: Parameters[0]; let match = /https:\/\/twitter.com\/([^\/]+)\/lists\/([^\/]+)/.exec(currentFeed); if (match) { endpoint = 'listTweets'; if (match[1] === 'i') { id = match[2]; } else if (id === undefined) { job = job.then(() => this.client.v1.list({ owner_screen_name: match[1], slug: match[2], })).then(({id_str}) => { lock.threads[currentFeed].id = id = id_str; }); } } else { match = /https:\/\/twitter.com\/([^\/]+)/.exec(currentFeed); if (match) { endpoint = 'userTimeline'; if (id === undefined) { job = job.then(() => this.queryUser( match[1].replace(/^@?(.*)$/, '$1') )).then(userNameId => { lock.threads[currentFeed].id = id = userNameId.split(':')[1]; }); } } } const offset = lock.threads[currentFeed].offset; job.then(() => this.get(endpoint, id, { ...v2SingleParams, max_results: 20, exclude: ['retweets'], ...(offset as unknown as number > 0) && {since_id: offset}, ...(offset as unknown as number < -1) && {until_id: offset.slice(1)}, })).catch((err: Twitter.InlineErrorV2) => { if (err.title === 'Not Found Error') { logger.warn(`error on fetching tweets for ${currentFeed}: ${showApiError(err)}`); lock.threads[currentFeed].subscribers.forEach(subscriber => { logger.info(`sending notfound message of ${currentFeed} to ${JSON.stringify(subscriber)}`); this.bot.sendTo(subscriber, `链接 ${currentFeed} 指向的用户或列表不存在,请退订。`).catch(); }); } else { logger.error(`unhandled error on fetching tweets for ${currentFeed}: ${showApiError(err)}`); } return [] as Tweet[]; }).then(resolve); }); promise.then((tweets: Tweet[]) => { logger.debug(`api returned ${JSON.stringify(tweets)} for feed ${currentFeed}`); const currentThread = lock.threads[currentFeed]; const setOffset = (offset: string) => currentThread.offset = offset; const updateDate = () => currentThread.updatedAt = new Date().toString(); if (tweets.length === 0) { if (currentThread.offset as unknown as number < -1) { setOffset(BigNumOps.plus('1', currentThread.offset)); } updateDate(); return; } const currentUser = tweets[0].includes.users.find(user => user.id === currentThread.id); if (currentUser.username !== parseLink(currentFeed)[1]) { lock.feed[lock.workon] = linkBuilder(currentUser.username); } const topOfFeed = tweets[0].data.id; logger.info(`current offset: ${currentThread.offset}, current top of feed: ${topOfFeed}`); const bottomOfFeed = tweets.slice(-1)[0].data.id; const updateOffset = () => setOffset(topOfFeed); tweets = tweets.filter(({data}) => (data.attachments || {}).media_keys); logger.info(`found ${tweets.length} tweets with extended entities`); if (currentThread.offset === '-1') { updateOffset(); return; } if (currentThread.offset as unknown as number <= 0) { if (tweets.length === 0) { setOffset(BigNumOps.plus('1', '-' + bottomOfFeed)); lock.workon--; return; } tweets.splice(1); } if (tweets.length === 0) { updateDate(); updateOffset(); return; } return this.workOnTweets(tweets, this.sendTweets({sourceInfo: `thread ${currentFeed}`}, ...currentThread.subscribers)) .then(updateDate).then(updateOffset); }) .then(() => { lock.workon++; let timeout = this.workInterval * 1000 / lock.feed.length; if (timeout < 1000) timeout = 1000; fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock)); setTimeout(() => { this.work(); }, timeout); }); }; }