twitter.ts 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. import * as fs from 'fs';
  2. import * as path from 'path';
  3. import {
  4. IgApiClient,
  5. IgClientError, IgExactUserNotFoundError,
  6. IgLoginRequiredError,
  7. IgNetworkError,
  8. ReelsMediaFeedResponseItem, UserFeedResponseUser
  9. } from 'instagram-private-api';
  10. import { RequestError } from 'request-promise/errors';
  11. import { SocksProxyAgent } from 'socks-proxy-agent';
  12. import { getLogger } from './loggers';
  13. import QQBot, { Message } from './koishi';
  14. import { BigNumOps } from './utils';
  15. import Webshot from './webshot';
  16. const parseLink = (link: string): {userName?: string, storyId?: string} => {
  17. let match =
  18. /instagram\.com\/stories\/([^\/?#]+)\/(\d+)/.exec(link);
  19. if (match) return {userName: ScreenNameNormalizer.normalize(match[1]).split(':')[0], storyId: match[2]};
  20. match =
  21. /instagram\.com\/([^\/?#]+)/.exec(link) ||
  22. /^([^\/?#]+)$/.exec(link);
  23. if (match) return {userName: ScreenNameNormalizer.normalize(match[1]).split(':')[0]};
  24. return;
  25. };
  26. const linkBuilder = (config: ReturnType<typeof parseLink>): string => {
  27. if (!config.userName) return;
  28. if (!config.storyId) return `https://www.instagram.com/${config.userName}/`;
  29. return `https://www.instagram.com/stories/${config.userName}/${config.storyId}/`;
  30. };
  31. export {linkBuilder, parseLink};
  32. interface IWorkerOption {
  33. sessionLockfile: string;
  34. credentials: [string, string];
  35. proxyUrl: string;
  36. lock: ILock;
  37. lockfile: string;
  38. webshotCookiesLockfile: string;
  39. bot: QQBot;
  40. workInterval: number;
  41. webshotDelay: number;
  42. mode: number;
  43. wsUrl: string;
  44. }
  45. export class SessionManager {
  46. private ig: IgApiClient;
  47. private username: string;
  48. private password: string;
  49. private lockfile: string;
  50. constructor(client: IgApiClient, file: string, credentials: [string, string]) {
  51. this.ig = client;
  52. this.lockfile = file;
  53. [this.username, this.password] = credentials;
  54. }
  55. public init = () => {
  56. this.ig.state.generateDevice(this.username);
  57. this.ig.request.end$.subscribe(() => { this.save(); });
  58. const filePath = path.resolve(this.lockfile);
  59. if (fs.existsSync(filePath)) {
  60. try {
  61. const serialized = JSON.parse(fs.readFileSync(filePath, 'utf8')) as {[key: string]: any};
  62. return this.ig.state.deserialize(serialized).then(() => {
  63. logger.info(`successfully loaded client session cookies for user ${this.username}`);
  64. });
  65. } catch (err) {
  66. logger.error(`failed to load client session cookies from file ${this.lockfile}: `, err);
  67. return Promise.resolve();
  68. }
  69. } else return this.login();
  70. };
  71. public login = () =>
  72. this.ig.simulate.preLoginFlow()
  73. .then(() => this.ig.account.login(this.username, this.password))
  74. .then(() => new Promise(resolve => {
  75. logger.info(`successfully logged in as ${this.username}`);
  76. process.nextTick(() => resolve(this.ig.simulate.postLoginFlow()));
  77. }));
  78. public save = () =>
  79. this.ig.state.serialize()
  80. .then((serialized: {[key: string]: any}) => {
  81. delete serialized.constants;
  82. return fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(serialized, null, 2), 'utf-8');
  83. });
  84. }
  85. export class ScreenNameNormalizer {
  86. // tslint:disable-next-line: variable-name
  87. public static _queryUser: (username: string) => Promise<string>;
  88. public static normalize = (username: string) => `${username.toLowerCase().replace(/^@/, '')}:`;
  89. public static async normalizeLive(username: string) {
  90. if (this._queryUser) {
  91. return await this._queryUser(username)
  92. .catch((err: IgClientError) => {
  93. if (!(err instanceof IgExactUserNotFoundError)) {
  94. logger.warn(`error looking up user: ${err.message}`);
  95. return `${username}:`;
  96. }
  97. return null;
  98. });
  99. }
  100. return this.normalize(username);
  101. }
  102. }
  103. export let sendAllStories = (segmentId: string, receiver: IChat): void => {
  104. throw Error();
  105. };
  106. export type MediaItem = ReelsMediaFeedResponseItem;
  107. const logger = getLogger('instagram');
  108. const maxTrials = 3;
  109. const retryInterval = 1500;
  110. const ordinal = (n: number) => {
  111. switch ((Math.trunc(n / 10) % 10 === 1) ? 0 : n % 10) {
  112. case 1:
  113. return `${n}st`;
  114. case 2:
  115. return `${n}nd`;
  116. case 3:
  117. return `${n}rd`;
  118. default:
  119. return `${n}th`;
  120. }
  121. };
  122. const retryOnError = <T, U>(
  123. doWork: () => Promise<T>,
  124. onRetry: (error, count: number, terminate: (defaultValue: U) => void) => void
  125. ) => new Promise<T | U>(resolve => {
  126. const retry = (reason, count: number) => {
  127. setTimeout(() => {
  128. let terminate = false;
  129. onRetry(reason, count, defaultValue => { terminate = true; resolve(defaultValue); });
  130. if (!terminate) doWork().then(resolve).catch(error => retry(error, count + 1));
  131. }, retryInterval);
  132. };
  133. doWork().then(resolve).catch(error => retry(error, 1));
  134. });
  135. export default class {
  136. private client: IgApiClient;
  137. private lock: ILock;
  138. private lockfile: string;
  139. private workInterval: number;
  140. private bot: QQBot;
  141. private webshotDelay: number;
  142. private webshot: Webshot;
  143. private mode: number;
  144. private wsUrl: string;
  145. public session: SessionManager;
  146. constructor(opt: IWorkerOption) {
  147. this.client = new IgApiClient();
  148. if (opt.proxyUrl) {
  149. try {
  150. const url = new URL(opt.proxyUrl);
  151. if (!/^socks(?:4a?|5h?)?:$/.test(url.protocol)) throw Error();
  152. if (!url.port) url.port = '1080';
  153. this.client.request.defaults.agent = new SocksProxyAgent({
  154. hostname: url.hostname,
  155. port: url.port,
  156. userId: url.username,
  157. password: url.password,
  158. });
  159. } catch (e) {
  160. logger.warn(`invalid socks proxy url: ${opt.proxyUrl}, ignoring`);
  161. }
  162. }
  163. this.session = new SessionManager(this.client, opt.sessionLockfile, opt.credentials);
  164. this.lockfile = opt.lockfile;
  165. this.lock = opt.lock;
  166. this.workInterval = opt.workInterval;
  167. this.bot = opt.bot;
  168. this.webshotDelay = opt.webshotDelay;
  169. this.mode = opt.mode;
  170. this.wsUrl = opt.wsUrl;
  171. ScreenNameNormalizer._queryUser = this.queryUser;
  172. sendAllStories = (rawUserName, receiver) => {
  173. const sender = this.sendStories(`instagram stories for ${rawUserName}`, receiver);
  174. this.queryUser(rawUserName)
  175. .then(userNameId => {
  176. const [userName, userId] = userNameId.split(':');
  177. if (userName in this.cache && Object.keys(this.cache[userName].stories).length > 0) {
  178. return Promise.resolve(
  179. Object.values(this.cache[userName].stories)
  180. .map(story => ({...story, user: this.cache[userName].user}))
  181. .sort((i1, i2) => BigNumOps.compare(i2.pk, i1.pk))
  182. );
  183. }
  184. return this.client.feed.reelsMedia({userIds: [userId]}).items()
  185. .then(storyItems => {
  186. storyItems = storyItems.map(story => ({...story, user: this.cache[userName].user}));
  187. storyItems.forEach(item => {
  188. if (!(item.pk in this.cache[userName].stories)) {
  189. this.cache[userName].stories[item.pk] = item;
  190. }
  191. });
  192. if (storyItems.length === 0) this.bot.sendTo(receiver, `当前用户 (@${userName}) 没有可用的推特故事。`);
  193. return storyItems;
  194. });
  195. })
  196. .then(storyItems => this.workOnMedia(storyItems, sender))
  197. .catch((error: IgClientError & Partial<RequestError>) => {
  198. if (error instanceof IgNetworkError) {
  199. logger.warn(`error on fetching stories for ${rawUserName}: ${JSON.stringify(error.cause)}`);
  200. this.bot.sendTo(receiver, `获取 Fleets 时出现错误:原因: ${error.cause}`);
  201. } else if (error instanceof IgLoginRequiredError) {
  202. logger.warn('login required, logging in again...');
  203. this.session.login().then(() => sendAllStories(rawUserName, receiver));
  204. } else {
  205. logger.error(`unhandled error on fetching media for ${rawUserName}: ${error}`);
  206. this.bot.sendTo(receiver, `获取 Fleets 时发生未知错误: ${error}`);
  207. }
  208. });
  209. };
  210. }
  211. public launch = () => {
  212. this.webshot = new Webshot(
  213. this.wsUrl,
  214. this.mode,
  215. () => {
  216. setTimeout(this.workForAll, this.workInterval * 1000);
  217. setTimeout(() => {
  218. this.work();
  219. setInterval(this.workForAll, this.workInterval * 10000);
  220. }, this.workInterval * 1200);
  221. }
  222. );
  223. };
  224. public queryUser = (rawUserName: string) => {
  225. const username = ScreenNameNormalizer.normalize(rawUserName).split(':')[0];
  226. if (username in this.cache) {
  227. return Promise.resolve(`${username}:${this.cache[username].user.pk}`);
  228. }
  229. return this.client.user.searchExact(username)
  230. .then(user => {
  231. this.cache[user.username] = {user, stories: {}};
  232. return `${user.username}:${user.pk}`;
  233. });
  234. };
  235. private workOnMedia = (
  236. mediaItems: MediaItem[],
  237. sendMedia: (msg: string, text: string, author: string) => void
  238. ) => this.webshot(mediaItems, sendMedia, this.webshotDelay);
  239. private sendStories = (source?: string, ...to: IChat[]) => (msg: string, text: string, author: string) => {
  240. to.forEach(subscriber => {
  241. logger.info(`pushing data${source ? ` of ${Message.ellipseBase64(source)}` : ''} to ${JSON.stringify(subscriber)}`);
  242. retryOnError(
  243. () => this.bot.sendTo(subscriber, msg),
  244. (_, count, terminate: (doNothing: Promise<void>) => void) => {
  245. if (count <= maxTrials) {
  246. logger.warn(`retry sending to ${subscriber.chatID} for the ${ordinal(count)} time...`);
  247. } else {
  248. logger.warn(`${count - 1} consecutive failures while sending` +
  249. 'message chain, trying plain text instead...');
  250. terminate(this.bot.sendTo(subscriber, author + text));
  251. }
  252. });
  253. });
  254. };
  255. private cache: {
  256. [userName: string]: {
  257. user: UserFeedResponseUser & ReelsMediaFeedResponseItem['user'],
  258. stories: {[storyId: string]: MediaItem},
  259. },
  260. } = {};
  261. private workForAll = () => {
  262. const idToUserMap: {[id: number]: UserFeedResponseUser} = {};
  263. Promise.all(Object.entries(this.lock.threads).map(entry => {
  264. const id = entry[1].id;
  265. const userName = parseLink(entry[0]).userName;
  266. logger.debug(`preparing to add user @${userName} to next pull task...`);
  267. if (userName in this.cache) return Promise.resolve(idToUserMap[id] = this.cache[userName].user);
  268. return this.client.user.info(id).then(user => {
  269. logger.debug(`initialized cache item for user ${user.full_name} (@${userName})`);
  270. this.cache[userName] = {user, stories: {}};
  271. return idToUserMap[id] = user as UserFeedResponseUser;
  272. });
  273. }))
  274. .then(() => {
  275. logger.debug(`pulling stories for users: ${Object.values(idToUserMap).map(user => user.username)}`);
  276. this.client.feed.reelsMedia({
  277. userIds: Object.keys(idToUserMap),
  278. }).items()
  279. .then(storyItems => storyItems.forEach(item => {
  280. if (!(item.pk in this.cache[idToUserMap[item.user.pk].username].stories)) {
  281. this.cache[idToUserMap[item.user.pk].username].stories[item.pk] = item;
  282. }
  283. }))
  284. .catch((error: IgClientError & Partial<RequestError>) => {
  285. if (error instanceof IgNetworkError) {
  286. logger.warn(`error on fetching stories for all: ${JSON.stringify(error.cause)}`);
  287. } else if (error instanceof IgLoginRequiredError) {
  288. logger.warn('login required, logging in again...');
  289. this.session.login().then(this.workForAll);
  290. } else {
  291. logger.error(`unhandled error on fetching media for all: ${error}`);
  292. }
  293. });
  294. });
  295. };
  296. public work = () => {
  297. const lock = this.lock;
  298. logger.debug(`current cache: ${JSON.stringify(this.cache)}`);
  299. if (this.workInterval < 1) this.workInterval = 1;
  300. if (lock.feed.length === 0) {
  301. setTimeout(this.work, this.workInterval * 1000);
  302. return;
  303. }
  304. if (lock.workon >= lock.feed.length) lock.workon = 0;
  305. if (!lock.threads[lock.feed[lock.workon]] ||
  306. !lock.threads[lock.feed[lock.workon]].subscribers ||
  307. lock.threads[lock.feed[lock.workon]].subscribers.length === 0) {
  308. logger.warn(`nobody subscribes thread ${lock.feed[lock.workon]}, removing from feed`);
  309. delete lock.threads[lock.feed[lock.workon]];
  310. lock.feed.splice(lock.workon, 1);
  311. fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
  312. this.work();
  313. return;
  314. }
  315. const currentFeed = lock.feed[lock.workon];
  316. logger.debug(`searching for new items from ${currentFeed} in cache`);
  317. const promise = new Promise<MediaItem[]>(resolve => {
  318. const match = /https:\/\/www\.instagram\.com\/([^\/]+)/.exec(currentFeed);
  319. if (match) {
  320. const cachedFeed = this.cache[match[1]];
  321. if (!cachedFeed) {
  322. setTimeout(this.work, this.workInterval * 1000);
  323. resolve([]);
  324. }
  325. const newer = (item: MediaItem) =>
  326. BigNumOps.compare(item.pk, lock.threads[currentFeed].offset) > 0;
  327. resolve(Object.values(cachedFeed.stories)
  328. .filter(newer)
  329. .map(story => ({...story, user: cachedFeed.user}))
  330. .sort((i1, i2) => BigNumOps.compare(i2.pk, i1.pk))
  331. );
  332. }
  333. });
  334. promise.then((mediaItems: MediaItem[]) => {
  335. const currentThread = lock.threads[currentFeed];
  336. const updateDate = () => currentThread.updatedAt = new Date().toString();
  337. if (!mediaItems || mediaItems.length === 0) { updateDate(); return; }
  338. const topOfFeed = mediaItems[0].pk;
  339. const updateOffset = () => currentThread.offset = topOfFeed;
  340. if (currentThread.offset === '-1') { updateOffset(); return; }
  341. if (currentThread.offset === '0') mediaItems.splice(1);
  342. return this.workOnMedia(mediaItems, this.sendStories(`thread ${currentFeed}`, ...currentThread.subscribers))
  343. .then(updateDate).then(updateOffset);
  344. })
  345. .then(() => {
  346. lock.workon++;
  347. let timeout = this.workInterval * 1000 / lock.feed.length;
  348. if (timeout < 1000) timeout = 1000;
  349. fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
  350. setTimeout(() => {
  351. this.work();
  352. }, timeout);
  353. });
  354. };
  355. }