twitter.ts 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. import * as fs from 'fs';
  2. import * as path from 'path';
  3. import * as Twitter from 'twitter';
  4. import TwitterTypes from 'twitter-d';
  5. import { getLogger } from './loggers';
  6. import QQBot, { Message, MessageChain } from './mirai';
  7. import Webshot from './webshot';
  8. interface IWorkerOption {
  9. lock: ILock;
  10. lockfile: string;
  11. bot: QQBot;
  12. workInterval: number;
  13. webshotDelay: number;
  14. consumer_key: string;
  15. consumer_secret: string;
  16. access_token_key: string;
  17. access_token_secret: string;
  18. mode: number;
  19. }
  20. const logger = getLogger('twitter');
  21. export type FullUser = TwitterTypes.FullUser;
  22. export type Entities = TwitterTypes.Entities;
  23. export type ExtendedEntities = TwitterTypes.ExtendedEntities;
  24. interface ITweet {
  25. user: FullUser;
  26. entities: Entities;
  27. extended_entities: ExtendedEntities;
  28. full_text: string;
  29. display_text_range: [number, number];
  30. id_str: string;
  31. retweeted_status?: Tweet;
  32. }
  33. export type Tweet = ITweet;
  34. export type Tweets = ITweet[];
  35. export default class {
  36. private client: Twitter;
  37. private lock: ILock;
  38. private lockfile: string;
  39. private workInterval: number;
  40. private bot: QQBot;
  41. private webshotDelay: number;
  42. private webshot: Webshot;
  43. private mode: number;
  44. constructor(opt: IWorkerOption) {
  45. this.client = new Twitter({
  46. consumer_key: opt.consumer_key,
  47. consumer_secret: opt.consumer_secret,
  48. access_token_key: opt.access_token_key,
  49. access_token_secret: opt.access_token_secret,
  50. });
  51. this.lockfile = opt.lockfile;
  52. this.lock = opt.lock;
  53. this.workInterval = opt.workInterval;
  54. this.bot = opt.bot;
  55. this.webshotDelay = opt.webshotDelay;
  56. this.mode = opt.mode;
  57. }
  58. public launch = () => {
  59. this.webshot = new Webshot(
  60. this.mode,
  61. () => setTimeout(this.work, this.workInterval * 1000)
  62. );
  63. }
  64. public work = () => {
  65. const lock = this.lock;
  66. if (this.workInterval < 1) this.workInterval = 1;
  67. if (lock.feed.length === 0) {
  68. setTimeout(() => {
  69. this.work();
  70. }, this.workInterval * 1000);
  71. return;
  72. }
  73. if (lock.workon >= lock.feed.length) lock.workon = 0;
  74. if (!lock.threads[lock.feed[lock.workon]] ||
  75. !lock.threads[lock.feed[lock.workon]].subscribers ||
  76. lock.threads[lock.feed[lock.workon]].subscribers.length === 0) {
  77. logger.warn(`nobody subscribes thread ${lock.feed[lock.workon]}, removing from feed`);
  78. delete lock.threads[lock.feed[lock.workon]];
  79. lock.feed.splice(lock.workon, 1);
  80. fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
  81. this.work();
  82. return;
  83. }
  84. const currentFeed = lock.feed[lock.workon];
  85. logger.debug(`pulling feed ${currentFeed}`);
  86. const promise = new Promise(resolve => {
  87. let match = currentFeed.match(/https:\/\/twitter.com\/([^\/]+)\/lists\/([^\/]+)/);
  88. let config: any;
  89. let endpoint: string;
  90. if (match) {
  91. config = {
  92. owner_screen_name: match[1],
  93. slug: match[2],
  94. tweet_mode: 'extended',
  95. };
  96. endpoint = 'lists/statuses';
  97. } else {
  98. match = currentFeed.match(/https:\/\/twitter.com\/([^\/]+)/);
  99. if (match) {
  100. config = {
  101. screen_name: match[1],
  102. exclude_replies: false,
  103. tweet_mode: 'extended',
  104. };
  105. endpoint = 'statuses/user_timeline';
  106. }
  107. }
  108. if (endpoint) {
  109. const offset = lock.threads[currentFeed].offset as unknown as number;
  110. if (offset > 0) config.since_id = offset;
  111. this.client.get(endpoint, config, (error, tweets, response) => {
  112. if (error) {
  113. if (error instanceof Array && error.length > 0 && error[0].code === 34) {
  114. logger.warn(`error on fetching tweets for ${currentFeed}: ${JSON.stringify(error)}`);
  115. lock.threads[currentFeed].subscribers.forEach(subscriber => {
  116. logger.info(`sending notfound message of ${currentFeed} to ${JSON.stringify(subscriber)}`);
  117. this.bot.sendTo(subscriber, `链接 ${currentFeed} 指向的用户或列表不存在,请退订。`).catch();
  118. });
  119. } else {
  120. logger.error(`unhandled error on fetching tweets for ${currentFeed}: ${JSON.stringify(error)}`);
  121. }
  122. resolve();
  123. } else resolve(tweets);
  124. });
  125. }
  126. });
  127. promise.then((tweets: Tweets) => {
  128. logger.debug(`api returned ${JSON.stringify(tweets)} for feed ${currentFeed}`);
  129. const currentThread = lock.threads[currentFeed];
  130. const updateDate = () => currentThread.updatedAt = new Date().toString();
  131. if (!tweets || tweets.length === 0) { updateDate(); return; }
  132. const topOfFeed = tweets[0].id_str;
  133. const updateOffset = () => currentThread.offset = topOfFeed;
  134. if (currentThread.offset === '-1') { updateOffset(); return; }
  135. if (currentThread.offset === '0') tweets.splice(1);
  136. const maxCount = 3;
  137. const uploadTimeout = 10000;
  138. const retryInterval = 1500;
  139. const ordinal = (n: number) => {
  140. switch ((~~(n / 10) % 10 === 1) ? 0 : n % 10) {
  141. case 1:
  142. return `${n}st`;
  143. case 2:
  144. return `${n}nd`;
  145. case 3:
  146. return `${n}rd`;
  147. default:
  148. return `${n}th`;
  149. }
  150. };
  151. const retryOnError = <T, U>(
  152. doWork: () => Promise<T>,
  153. onRetry: (error, count: number, terminate: (defaultValue: U) => void) => void
  154. ) => new Promise<T | U>(resolve => {
  155. const retry = (reason, count: number) => {
  156. setTimeout(() => {
  157. let terminate = false;
  158. onRetry(reason, count, defaultValue => { terminate = true; resolve(defaultValue); });
  159. if (!terminate) doWork().then(resolve).catch(error => retry(error, count + 1));
  160. }, retryInterval);
  161. };
  162. doWork().then(resolve).catch(error => retry(error, 1));
  163. });
  164. const uploader = (
  165. message: ReturnType<typeof Message.Image>,
  166. lastResort: (...args) => ReturnType<typeof Message.Plain>
  167. ) => {
  168. let timeout = uploadTimeout;
  169. return retryOnError(() =>
  170. this.bot.uploadPic(message, timeout).then(() => message),
  171. (_, count, terminate: (defaultValue: ReturnType<typeof Message.Plain>) => void) => {
  172. if (count <= maxCount) {
  173. timeout *= (count + 2) / (count + 1);
  174. logger.warn(`retry uploading for the ${ordinal(count)} time...`);
  175. } else {
  176. logger.warn(`${count - 1} consecutive failures while uploading, trying plain text instead...`);
  177. terminate(lastResort());
  178. }
  179. });
  180. };
  181. const sendTweets = (msg: MessageChain, text: string, author: string) => {
  182. currentThread.subscribers.forEach(subscriber => {
  183. logger.info(`pushing data of thread ${currentFeed} to ${JSON.stringify(subscriber)}`);
  184. retryOnError(
  185. () => this.bot.sendTo(subscriber, msg),
  186. (_, count, terminate: (doNothing: Promise<void>) => void) => {
  187. if (count <= maxCount) {
  188. logger.warn(`retry sending to ${subscriber.chatID} for the ${ordinal(count)} time...`);
  189. } else {
  190. logger.warn(`${count - 1} consecutive failures while sending` +
  191. 'message chain, trying plain text instead...');
  192. terminate(this.bot.sendTo(subscriber, author + text));
  193. }
  194. });
  195. });
  196. };
  197. return this.webshot(tweets, uploader, sendTweets, this.webshotDelay)
  198. .then(updateDate).then(updateOffset);
  199. })
  200. .then(() => {
  201. lock.workon++;
  202. let timeout = this.workInterval * 1000 / lock.feed.length;
  203. if (timeout < 1000) timeout = 1000;
  204. fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
  205. setTimeout(() => {
  206. this.work();
  207. }, timeout);
  208. });
  209. }
  210. }