import * as fs from 'fs'; import * as path from 'path'; import { IgApiClient, IgClientError, IgExactUserNotFoundError, IgLoginRequiredError, IgNetworkError, ReelsMediaFeedResponseItem, UserFeedResponseUser } from 'instagram-private-api'; import { RequestError } from 'request-promise/errors'; import { SocksProxyAgent } from 'socks-proxy-agent'; import { getLogger } from './loggers'; import QQBot, { Message } from './koishi'; import { BigNumOps } from './utils'; import Webshot from './webshot'; const parseLink = (link: string): {userName?: string, storyId?: string} => { let match = /instagram\.com\/stories\/([^\/?#]+)\/(\d+)/.exec(link); if (match) return {userName: ScreenNameNormalizer.normalize(match[1]).split(':')[0], storyId: match[2]}; match = /instagram\.com\/([^\/?#]+)/.exec(link) || /^([^\/?#]+)$/.exec(link); if (match) return {userName: ScreenNameNormalizer.normalize(match[1]).split(':')[0]}; return; }; const linkBuilder = (config: ReturnType): string => { if (!config.userName) return; if (!config.storyId) return `https://www.instagram.com/${config.userName}/`; return `https://www.instagram.com/stories/${config.userName}/${config.storyId}/`; }; export {linkBuilder, parseLink}; interface IWorkerOption { sessionLockfile: string; credentials: [string, string]; proxyUrl: string; lock: ILock; lockfile: string; webshotCookiesLockfile: string; bot: QQBot; workInterval: number; webshotDelay: number; mode: number; wsUrl: string; } export class SessionManager { private ig: IgApiClient; private username: string; private password: string; private lockfile: string; constructor(client: IgApiClient, file: string, credentials: [string, string]) { this.ig = client; this.lockfile = file; [this.username, this.password] = credentials; } public init = () => { this.ig.state.generateDevice(this.username); this.ig.request.end$.subscribe(() => { this.save(); }); const filePath = path.resolve(this.lockfile); if (fs.existsSync(filePath)) { try { const serialized = JSON.parse(fs.readFileSync(filePath, 'utf8')) as {[key: string]: any}; return this.ig.state.deserialize(serialized).then(() => { logger.info(`successfully loaded client session cookies for user ${this.username}`); }); } catch (err) { logger.error(`failed to load client session cookies from file ${this.lockfile}: `, err); return Promise.resolve(); } } else return this.login(); }; public login = () => this.ig.simulate.preLoginFlow() .then(() => this.ig.account.login(this.username, this.password)) .then(() => new Promise(resolve => { logger.info(`successfully logged in as ${this.username}`); process.nextTick(() => resolve(this.ig.simulate.postLoginFlow())); })); public save = () => this.ig.state.serialize() .then((serialized: {[key: string]: any}) => { delete serialized.constants; return fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(serialized, null, 2), 'utf-8'); }); } export class ScreenNameNormalizer { // tslint:disable-next-line: variable-name public static _queryUser: (username: string) => Promise; public static normalize = (username: string) => `${username.toLowerCase().replace(/^@/, '')}:`; public static async normalizeLive(username: string) { if (this._queryUser) { return await this._queryUser(username) .catch((err: IgClientError) => { if (!(err instanceof IgExactUserNotFoundError)) { logger.warn(`error looking up user: ${err.message}`); return `${username}:`; } return null; }); } return this.normalize(username); } } export let sendAllStories = (segmentId: string, receiver: IChat): void => { throw Error(); }; export type MediaItem = ReelsMediaFeedResponseItem; const logger = getLogger('instagram'); const maxTrials = 3; const retryInterval = 1500; const ordinal = (n: number) => { switch ((Math.trunc(n / 10) % 10 === 1) ? 0 : n % 10) { case 1: return `${n}st`; case 2: return `${n}nd`; case 3: return `${n}rd`; default: return `${n}th`; } }; const retryOnError = ( doWork: () => Promise, onRetry: (error, count: number, terminate: (defaultValue: U) => void) => void ) => new Promise(resolve => { const retry = (reason, count: number) => { setTimeout(() => { let terminate = false; onRetry(reason, count, defaultValue => { terminate = true; resolve(defaultValue); }); if (!terminate) doWork().then(resolve).catch(error => retry(error, count + 1)); }, retryInterval); }; doWork().then(resolve).catch(error => retry(error, 1)); }); export default class { private client: IgApiClient; private lock: ILock; private lockfile: string; private workInterval: number; private bot: QQBot; private webshotDelay: number; private webshot: Webshot; private mode: number; private wsUrl: string; public session: SessionManager; constructor(opt: IWorkerOption) { this.client = new IgApiClient(); if (opt.proxyUrl) { try { const url = new URL(opt.proxyUrl); if (!/^socks(?:4a?|5h?)?:$/.test(url.protocol)) throw Error(); if (!url.port) url.port = '1080'; this.client.request.defaults.agent = new SocksProxyAgent({ hostname: url.hostname, port: url.port, userId: url.username, password: url.password, }); } catch (e) { logger.warn(`invalid socks proxy url: ${opt.proxyUrl}, ignoring`); } } this.session = new SessionManager(this.client, opt.sessionLockfile, opt.credentials); this.lockfile = opt.lockfile; this.lock = opt.lock; this.workInterval = opt.workInterval; this.bot = opt.bot; this.webshotDelay = opt.webshotDelay; this.mode = opt.mode; this.wsUrl = opt.wsUrl; ScreenNameNormalizer._queryUser = this.queryUser; sendAllStories = (rawUserName, receiver) => { const sender = this.sendStories(`instagram stories for ${rawUserName}`, receiver); this.queryUser(rawUserName) .then(userNameId => { const [userName, userId] = userNameId.split(':'); if (userName in this.cache && Object.keys(this.cache[userName].stories).length > 0) { return Promise.resolve( Object.values(this.cache[userName].stories) .map(story => ({...story, user: this.cache[userName].user})) .sort((i1, i2) => BigNumOps.compare(i2.pk, i1.pk)) ); } return this.client.feed.reelsMedia({userIds: [userId]}).items() .then(storyItems => { storyItems = storyItems.map(story => ({...story, user: this.cache[userName].user})); storyItems.forEach(item => { if (!(item.pk in this.cache[userName].stories)) { this.cache[userName].stories[item.pk] = item; } }); if (storyItems.length === 0) this.bot.sendTo(receiver, `当前用户 (@${userName}) 没有可用的推特故事。`); return storyItems; }); }) .then(storyItems => this.workOnMedia(storyItems, sender)) .catch((error: IgClientError & Partial) => { if (error instanceof IgNetworkError) { logger.warn(`error on fetching stories for ${rawUserName}: ${JSON.stringify(error.cause)}`); this.bot.sendTo(receiver, `获取 Fleets 时出现错误:原因: ${error.cause}`); } else if (error instanceof IgLoginRequiredError) { logger.warn('login required, logging in again...'); this.session.login().then(() => sendAllStories(rawUserName, receiver)); } else { logger.error(`unhandled error on fetching media for ${rawUserName}: ${error}`); this.bot.sendTo(receiver, `获取 Fleets 时发生未知错误: ${error}`); } }); }; } public launch = () => { this.webshot = new Webshot( this.wsUrl, this.mode, () => { setTimeout(this.workForAll, this.workInterval * 1000); setTimeout(() => { this.work(); setInterval(this.workForAll, this.workInterval * 10000); }, this.workInterval * 1200); } ); }; public queryUser = (rawUserName: string) => { const username = ScreenNameNormalizer.normalize(rawUserName).split(':')[0]; if (username in this.cache) { return Promise.resolve(`${username}:${this.cache[username].user.pk}`); } return this.client.user.searchExact(username) .then(user => { this.cache[user.username] = {user, stories: {}}; return `${user.username}:${user.pk}`; }); }; private workOnMedia = ( mediaItems: MediaItem[], sendMedia: (msg: string, text: string, author: string) => void ) => this.webshot(mediaItems, sendMedia, this.webshotDelay); private sendStories = (source?: string, ...to: IChat[]) => (msg: string, text: string, author: string) => { to.forEach(subscriber => { logger.info(`pushing data${source ? ` of ${Message.ellipseBase64(source)}` : ''} to ${JSON.stringify(subscriber)}`); retryOnError( () => this.bot.sendTo(subscriber, msg), (_, count, terminate: (doNothing: Promise) => void) => { if (count <= maxTrials) { logger.warn(`retry sending to ${subscriber.chatID} for the ${ordinal(count)} time...`); } else { logger.warn(`${count - 1} consecutive failures while sending` + 'message chain, trying plain text instead...'); terminate(this.bot.sendTo(subscriber, author + text)); } }); }); }; private cache: { [userName: string]: { user: UserFeedResponseUser & ReelsMediaFeedResponseItem['user'], stories: {[storyId: string]: MediaItem}, }, } = {}; private workForAll = () => { const idToUserMap: {[id: number]: UserFeedResponseUser} = {}; Promise.all(Object.entries(this.lock.threads).map(entry => { const id = entry[1].id; const userName = parseLink(entry[0]).userName; logger.debug(`preparing to add user @${userName} to next pull task...`); if (userName in this.cache) return Promise.resolve(idToUserMap[id] = this.cache[userName].user); return this.client.user.info(id).then(user => { logger.debug(`initialized cache item for user ${user.full_name} (@${userName})`); this.cache[userName] = {user, stories: {}}; return idToUserMap[id] = user as UserFeedResponseUser; }); })) .then(() => { logger.debug(`pulling stories for users: ${Object.values(idToUserMap).map(user => user.username)}`); this.client.feed.reelsMedia({ userIds: Object.keys(idToUserMap), }).items() .then(storyItems => storyItems.forEach(item => { if (!(item.pk in this.cache[idToUserMap[item.user.pk].username].stories)) { this.cache[idToUserMap[item.user.pk].username].stories[item.pk] = item; } })) .catch((error: IgClientError & Partial) => { if (error instanceof IgNetworkError) { logger.warn(`error on fetching stories for all: ${JSON.stringify(error.cause)}`); } else if (error instanceof IgLoginRequiredError) { logger.warn('login required, logging in again...'); this.session.login().then(this.workForAll); } else { logger.error(`unhandled error on fetching media for all: ${error}`); } }); }); }; public work = () => { const lock = this.lock; logger.debug(`current cache: ${JSON.stringify(this.cache)}`); if (this.workInterval < 1) this.workInterval = 1; if (lock.feed.length === 0) { setTimeout(this.work, this.workInterval * 1000); return; } if (lock.workon >= lock.feed.length) lock.workon = 0; if (!lock.threads[lock.feed[lock.workon]] || !lock.threads[lock.feed[lock.workon]].subscribers || lock.threads[lock.feed[lock.workon]].subscribers.length === 0) { logger.warn(`nobody subscribes thread ${lock.feed[lock.workon]}, removing from feed`); delete lock.threads[lock.feed[lock.workon]]; lock.feed.splice(lock.workon, 1); fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock)); this.work(); return; } const currentFeed = lock.feed[lock.workon]; logger.debug(`searching for new items from ${currentFeed} in cache`); const promise = new Promise(resolve => { const match = /https:\/\/www\.instagram\.com\/([^\/]+)/.exec(currentFeed); if (match) { const cachedFeed = this.cache[match[1]]; if (!cachedFeed) { setTimeout(this.work, this.workInterval * 1000); resolve([]); } const newer = (item: MediaItem) => BigNumOps.compare(item.pk, lock.threads[currentFeed].offset) > 0; resolve(Object.values(cachedFeed.stories) .filter(newer) .map(story => ({...story, user: cachedFeed.user})) .sort((i1, i2) => BigNumOps.compare(i2.pk, i1.pk)) ); } }); promise.then((mediaItems: MediaItem[]) => { const currentThread = lock.threads[currentFeed]; const updateDate = () => currentThread.updatedAt = new Date().toString(); if (!mediaItems || mediaItems.length === 0) { updateDate(); return; } const topOfFeed = mediaItems[0].pk; const updateOffset = () => currentThread.offset = topOfFeed; if (currentThread.offset === '-1') { updateOffset(); return; } if (currentThread.offset === '0') mediaItems.splice(1); return this.workOnMedia(mediaItems, this.sendStories(`thread ${currentFeed}`, ...currentThread.subscribers)) .then(updateDate).then(updateOffset); }) .then(() => { lock.workon++; let timeout = this.workInterval * 1000 / lock.feed.length; if (timeout < 1000) timeout = 1000; fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock)); setTimeout(() => { this.work(); }, timeout); }); }; }