twitter.js 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const fs = require("fs");
  4. const path = require("path");
  5. const Twitter = require("twitter");
  6. const loggers_1 = require("./loggers");
  7. const webshot_1 = require("./webshot");
  8. const logger = loggers_1.getLogger('twitter');
  9. class default_1 {
  10. constructor(opt) {
  11. this.launch = () => {
  12. this.webshot = new webshot_1.default(this.mode, () => setTimeout(this.work, this.workInterval * 1000));
  13. };
  14. this.work = () => {
  15. const lock = this.lock;
  16. if (this.workInterval < 1)
  17. this.workInterval = 1;
  18. if (lock.feed.length === 0) {
  19. setTimeout(() => {
  20. this.work();
  21. }, this.workInterval * 1000);
  22. return;
  23. }
  24. if (lock.workon >= lock.feed.length)
  25. lock.workon = 0;
  26. if (!lock.threads[lock.feed[lock.workon]] ||
  27. !lock.threads[lock.feed[lock.workon]].subscribers ||
  28. lock.threads[lock.feed[lock.workon]].subscribers.length === 0) {
  29. logger.warn(`nobody subscribes thread ${lock.feed[lock.workon]}, removing from feed`);
  30. delete lock.threads[lock.feed[lock.workon]];
  31. lock.feed.splice(lock.workon, 1);
  32. fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
  33. this.work();
  34. return;
  35. }
  36. const currentFeed = lock.feed[lock.workon];
  37. logger.debug(`pulling feed ${currentFeed}`);
  38. const promise = new Promise(resolve => {
  39. let match = currentFeed.match(/https:\/\/twitter.com\/([^\/]+)\/lists\/([^\/]+)/);
  40. let config;
  41. let endpoint;
  42. if (match) {
  43. config = {
  44. owner_screen_name: match[1],
  45. slug: match[2],
  46. tweet_mode: 'extended',
  47. };
  48. endpoint = 'lists/statuses';
  49. }
  50. else {
  51. match = currentFeed.match(/https:\/\/twitter.com\/([^\/]+)/);
  52. if (match) {
  53. config = {
  54. screen_name: match[1],
  55. exclude_replies: false,
  56. tweet_mode: 'extended',
  57. };
  58. endpoint = 'statuses/user_timeline';
  59. }
  60. }
  61. if (endpoint) {
  62. const offset = lock.threads[currentFeed].offset;
  63. if (offset > 0)
  64. config.since_id = offset;
  65. if (offset < -1)
  66. config.max_id = offset.slice(1);
  67. this.client.get(endpoint, config, (error, tweets, response) => {
  68. if (error) {
  69. if (error instanceof Array && error.length > 0 && error[0].code === 34) {
  70. logger.warn(`error on fetching tweets for ${currentFeed}: ${JSON.stringify(error)}`);
  71. lock.threads[currentFeed].subscribers.forEach(subscriber => {
  72. logger.info(`sending notfound message of ${currentFeed} to ${JSON.stringify(subscriber)}`);
  73. this.bot.sendTo(subscriber, `链接 ${currentFeed} 指向的用户或列表不存在,请退订。`).catch();
  74. });
  75. }
  76. else {
  77. logger.error(`unhandled error on fetching tweets for ${currentFeed}: ${JSON.stringify(error)}`);
  78. }
  79. resolve();
  80. }
  81. else
  82. resolve(tweets);
  83. });
  84. }
  85. });
  86. promise.then((tweets) => {
  87. logger.debug(`api returned ${JSON.stringify(tweets)} for feed ${currentFeed}`);
  88. const currentThread = lock.threads[currentFeed];
  89. const updateDate = () => currentThread.updatedAt = new Date().toString();
  90. if (!tweets || tweets.length === 0) {
  91. updateDate();
  92. return;
  93. }
  94. const topOfFeed = tweets[0].id_str;
  95. logger.info(`current offset: ${currentThread.offset}, current top of feed: ${topOfFeed}`);
  96. const bottomOfFeed = tweets[tweets.length - 1].id_str;
  97. const setOffset = (offset) => currentThread.offset = offset;
  98. const updateOffset = () => setOffset(topOfFeed);
  99. tweets = tweets.filter(twi => !twi.retweeted_status && twi.extended_entities);
  100. logger.info(`found ${tweets.length} tweets with extended entities`);
  101. if (currentThread.offset === '-1') {
  102. updateOffset();
  103. return;
  104. }
  105. if (currentThread.offset <= 0) {
  106. if (tweets.length === 0) {
  107. setOffset('-' + bottomOfFeed);
  108. lock.workon--;
  109. return;
  110. }
  111. tweets.splice(1);
  112. }
  113. if (tweets.length === 0) {
  114. updateDate();
  115. updateOffset();
  116. return;
  117. }
  118. const maxCount = 3;
  119. const uploadTimeout = 10000;
  120. const retryInterval = 1500;
  121. const ordinal = (n) => {
  122. switch ((~~(n / 10) % 10 === 1) ? 0 : n % 10) {
  123. case 1:
  124. return `${n}st`;
  125. case 2:
  126. return `${n}nd`;
  127. case 3:
  128. return `${n}rd`;
  129. default:
  130. return `${n}th`;
  131. }
  132. };
  133. const retryOnError = (doWork, onRetry) => new Promise(resolve => {
  134. const retry = (reason, count) => {
  135. setTimeout(() => {
  136. let terminate = false;
  137. onRetry(reason, count, defaultValue => { terminate = true; resolve(defaultValue); });
  138. if (!terminate)
  139. doWork().then(resolve).catch(error => retry(error, count + 1));
  140. }, retryInterval);
  141. };
  142. doWork().then(resolve).catch(error => retry(error, 1));
  143. });
  144. const uploader = (message, lastResort) => {
  145. let timeout = uploadTimeout;
  146. return retryOnError(() => this.bot.uploadPic(message, timeout).then(() => message), (_, count, terminate) => {
  147. if (count <= maxCount) {
  148. timeout *= (count + 2) / (count + 1);
  149. logger.warn(`retry uploading for the ${ordinal(count)} time...`);
  150. }
  151. else {
  152. logger.warn(`${count - 1} consecutive failures while uploading, trying plain text instead...`);
  153. terminate(lastResort());
  154. }
  155. });
  156. };
  157. const sendTweets = (msg, text, author) => {
  158. currentThread.subscribers.forEach(subscriber => {
  159. logger.info(`pushing data of thread ${currentFeed} to ${JSON.stringify(subscriber)}`);
  160. retryOnError(() => this.bot.sendTo(subscriber, msg), (_, count, terminate) => {
  161. if (count <= maxCount) {
  162. logger.warn(`retry sending to ${subscriber.chatID} for the ${ordinal(count)} time...`);
  163. }
  164. else {
  165. logger.warn(`${count - 1} consecutive failures while sending` +
  166. 'message chain, trying plain text instead...');
  167. terminate(this.bot.sendTo(subscriber, author + text));
  168. }
  169. });
  170. });
  171. };
  172. return this.webshot(tweets, uploader, sendTweets, this.webshotDelay)
  173. .then(updateDate).then(updateOffset);
  174. })
  175. .then(() => {
  176. lock.workon++;
  177. let timeout = this.workInterval * 1000 / lock.feed.length;
  178. if (timeout < 1000)
  179. timeout = 1000;
  180. fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
  181. setTimeout(() => {
  182. this.work();
  183. }, timeout);
  184. });
  185. };
  186. this.client = new Twitter({
  187. consumer_key: opt.consumer_key,
  188. consumer_secret: opt.consumer_secret,
  189. access_token_key: opt.access_token_key,
  190. access_token_secret: opt.access_token_secret,
  191. });
  192. this.lockfile = opt.lockfile;
  193. this.lock = opt.lock;
  194. this.workInterval = opt.workInterval;
  195. this.bot = opt.bot;
  196. this.webshotDelay = opt.webshotDelay;
  197. this.mode = opt.mode;
  198. }
  199. }
  200. exports.default = default_1;