twitter.ts 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. import * as fs from 'fs';
  2. import * as log4js from 'log4js';
  3. import * as path from 'path';
  4. import * as redis from 'redis';
  5. import { RedisClient } from 'redis';
  6. import * as sha1 from 'sha1';
  7. import * as Twitter from 'twitter';
  8. import QQBot from './cqhttp';
  9. import Webshot from './webshot';
  10. interface IWorkerOption {
  11. lock: ILock;
  12. lockfile: string;
  13. bot: QQBot;
  14. workInterval: number;
  15. webshotDelay: number;
  16. consumer_key: string;
  17. consumer_secret: string;
  18. access_token_key: string;
  19. access_token_secret: string;
  20. redis: IRedisConfig;
  21. mode: number;
  22. }
  23. const logger = log4js.getLogger('twitter');
  24. logger.level = (global as any).loglevel;
  25. export default class {
  26. private client;
  27. private lock: ILock;
  28. private lockfile: string;
  29. private workInterval: number;
  30. private bot: QQBot;
  31. private webshotDelay: number;
  32. private webshot: Webshot;
  33. private redisConfig: IRedisConfig;
  34. private redisClient: RedisClient;
  35. private mode: number;
  36. constructor(opt: IWorkerOption) {
  37. this.client = new Twitter({
  38. consumer_key: opt.consumer_key,
  39. consumer_secret: opt.consumer_secret,
  40. access_token_key: opt.access_token_key,
  41. access_token_secret: opt.access_token_secret,
  42. });
  43. this.lockfile = opt.lockfile;
  44. this.lock = opt.lock;
  45. this.workInterval = opt.workInterval;
  46. this.bot = opt.bot;
  47. this.webshotDelay = opt.webshotDelay;
  48. this.redisConfig = opt.redis;
  49. this.mode = opt.mode;
  50. }
  51. public launch = () => {
  52. if (this.redisConfig) {
  53. this.redisClient = redis.createClient({
  54. host: this.redisConfig.redisHost,
  55. port: this.redisConfig.redisPort,
  56. });
  57. }
  58. this.webshot = new Webshot(() => setTimeout(this.work, this.workInterval * 1000));
  59. }
  60. public work = () => {
  61. const lock = this.lock;
  62. if (this.workInterval < 1) this.workInterval = 1;
  63. if (lock.feed.length === 0) {
  64. setTimeout(() => {
  65. this.work();
  66. }, this.workInterval * 1000);
  67. return;
  68. }
  69. if (lock.workon >= lock.feed.length) lock.workon = 0;
  70. if (!lock.threads[lock.feed[lock.workon]] ||
  71. !lock.threads[lock.feed[lock.workon]].subscribers ||
  72. lock.threads[lock.feed[lock.workon]].subscribers.length === 0) {
  73. logger.warn(`nobody subscribes thread ${lock.feed[lock.workon]}, removing from feed`);
  74. delete lock.threads[lock.feed[lock.workon]];
  75. lock.feed.splice(lock.workon, 1);
  76. fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
  77. this.work();
  78. return;
  79. }
  80. logger.debug(`pulling feed ${lock.feed[lock.workon]}`);
  81. const promise = new Promise(resolve => {
  82. let match = lock.feed[lock.workon].match(/https:\/\/twitter.com\/([^\/]+)\/lists\/([^\/]+)/);
  83. let config: any;
  84. let endpoint: string;
  85. if (match) {
  86. config = {
  87. owner_screen_name: match[1],
  88. slug: match[2],
  89. tweet_mode: 'extended',
  90. };
  91. endpoint = 'lists/statuses';
  92. } else {
  93. match = lock.feed[lock.workon].match(/https:\/\/twitter.com\/([^\/]+)/);
  94. if (match) {
  95. config = {
  96. screen_name: match[1],
  97. exclude_replies: false,
  98. tweet_mode: 'extended',
  99. };
  100. endpoint = 'statuses/user_timeline';
  101. }
  102. }
  103. if (endpoint) {
  104. const offset = lock.threads[lock.feed[lock.workon]].offset;
  105. if (offset > 0) config.since_id = offset;
  106. this.client.get(endpoint, config, (error, tweets, response) => {
  107. if (error) {
  108. if (error instanceof Array && error.length > 0 && error[0].code === 34) {
  109. logger.warn(`error on fetching tweets for ${lock.feed[lock.workon]}: ${JSON.stringify(error)}`);
  110. lock.threads[lock.feed[lock.workon]].subscribers.forEach(subscriber => {
  111. logger.info(`sending notfound message of ${lock.feed[lock.workon]} to ${JSON.stringify(subscriber)}`);
  112. this.bot.bot('send_msg', {
  113. message_type: subscriber.chatType,
  114. user_id: subscriber.chatID,
  115. group_id: subscriber.chatID,
  116. discuss_id: subscriber.chatID,
  117. message: `链接 ${lock.feed[lock.workon]} 指向的用户或列表不存在,请退订。`,
  118. });
  119. });
  120. } else {
  121. logger.error(`unhandled error on fetching tweets for ${lock.feed[lock.workon]}: ${JSON.stringify(error)}`);
  122. }
  123. resolve();
  124. } else resolve(tweets);
  125. });
  126. }
  127. });
  128. promise.then((tweets: any) => {
  129. logger.debug(`api returned ${JSON.stringify(tweets)} for feed ${lock.feed[lock.workon]}`);
  130. if (!tweets || tweets.length === 0) {
  131. lock.threads[lock.feed[lock.workon]].updatedAt = new Date().toString();
  132. return;
  133. }
  134. if (lock.threads[lock.feed[lock.workon]].offset === -1) {
  135. lock.threads[lock.feed[lock.workon]].offset = tweets[0].id_str;
  136. return;
  137. }
  138. if (lock.threads[lock.feed[lock.workon]].offset === 0) tweets.splice(1);
  139. return (this.webshot as any)(this.mode, tweets, (msg, text, author) => {
  140. lock.threads[lock.feed[lock.workon]].subscribers.forEach(subscriber => {
  141. logger.info(`pushing data of thread ${lock.feed[lock.workon]} to ${JSON.stringify(subscriber)}`);
  142. let hash = JSON.stringify(subscriber) + text;
  143. logger.debug(hash);
  144. hash = sha1(JSON.stringify(subscriber) + text);
  145. logger.debug(hash);
  146. const send = () => {
  147. this.bot.bot('send_msg', {
  148. message_type: subscriber.chatType,
  149. user_id: subscriber.chatID,
  150. group_id: subscriber.chatID,
  151. discuss_id: subscriber.chatID,
  152. message: this.mode === 0 ? msg : author + text,
  153. });
  154. };
  155. if (this.redisClient) {
  156. this.redisClient.exists(hash, (err, res) => {
  157. logger.debug('redis: ', res);
  158. if (err) {
  159. logger.error('redis error: ', err);
  160. } else if (res) {
  161. logger.info('key hash exists, skip this subscriber');
  162. return;
  163. }
  164. send();
  165. this.redisClient.set(hash, 'true', 'EX', this.redisConfig.redisExpireTime, (setErr, setRes) => {
  166. logger.debug('redis: ', setRes);
  167. if (setErr) {
  168. logger.error('redis error: ', setErr);
  169. }
  170. });
  171. });
  172. } else send();
  173. });
  174. }, this.webshotDelay)
  175. .then(() => {
  176. lock.threads[lock.feed[lock.workon]].offset = tweets[0].id_str;
  177. lock.threads[lock.feed[lock.workon]].updatedAt = new Date().toString();
  178. });
  179. })
  180. .then(() => {
  181. lock.workon++;
  182. let timeout = this.workInterval * 1000 / lock.feed.length;
  183. if (timeout < 1000) timeout = 1000;
  184. fs.writeFileSync(path.resolve(this.lockfile), JSON.stringify(lock));
  185. setTimeout(() => {
  186. this.work();
  187. }, timeout);
  188. });
  189. }
  190. }