|
@@ -1,7 +1,13 @@
|
|
import { createClient, Callback, ClientOpts, OverloadedCommand, RedisClient } from 'redis';
|
|
import { createClient, Callback, ClientOpts, OverloadedCommand, RedisClient } from 'redis';
|
|
import { getLogger } from './loggers';
|
|
import { getLogger } from './loggers';
|
|
|
|
|
|
-const logger = getLogger('redis');
|
|
|
|
|
|
+const loggers = {
|
|
|
|
+ main: getLogger('redis'),
|
|
|
|
+ cacher: getLogger('redis/cache'),
|
|
|
|
+ controller: getLogger('redis/controller'),
|
|
|
|
+ replica: getLogger('redis/replica'),
|
|
|
|
+ agent: getLogger('redis/agent'),
|
|
|
|
+};
|
|
|
|
|
|
interface Client extends Omit<Omit<RedisClient, 'pubsub'>, 'duplicate'> {
|
|
interface Client extends Omit<Omit<RedisClient, 'pubsub'>, 'duplicate'> {
|
|
pubsub: OverloadedCommand<string | string[], (string | number)[], boolean>;
|
|
pubsub: OverloadedCommand<string | string[], (string | number)[], boolean>;
|
|
@@ -21,20 +27,25 @@ export default class {
|
|
port: opt.redisPort,
|
|
port: opt.redisPort,
|
|
}) as unknown as Client;
|
|
}) as unknown as Client;
|
|
this.subscriber = this.client.duplicate();
|
|
this.subscriber = this.client.duplicate();
|
|
|
|
+ let logger = loggers.main;
|
|
this.client.pubsub('NUMSUB', 'twitter:locks', (err, reply) => {
|
|
this.client.pubsub('NUMSUB', 'twitter:locks', (err, reply) => {
|
|
if (err) {
|
|
if (err) {
|
|
logger.fatal(`failed to query subscribers of global lock registry, error: ${err}`);
|
|
logger.fatal(`failed to query subscribers of global lock registry, error: ${err}`);
|
|
process.exit(1);
|
|
process.exit(1);
|
|
}
|
|
}
|
|
logger.debug(`subscribers of global lock registry: ${reply[1]}`);
|
|
logger.debug(`subscribers of global lock registry: ${reply[1]}`);
|
|
- if (reply[1] > 0) return;
|
|
|
|
- this.subscriber.subscribe('twitter:locks', err => {
|
|
|
|
- if (err) {
|
|
|
|
- logger.fatal(`failed to subscribe to global lock registry, error: ${err}`);
|
|
|
|
- process.exit(1);
|
|
|
|
- }
|
|
|
|
- logger.info(`nobody monitoring global lock registry, taken over now`);
|
|
|
|
- });
|
|
|
|
|
|
+ if (reply[1] === 0) {
|
|
|
|
+ logger = loggers.controller;
|
|
|
|
+ this.subscriber.subscribe('twitter:locks', err => {
|
|
|
|
+ if (err) {
|
|
|
|
+ logger.fatal(`failed to subscribe to global lock registry, error: ${err}`);
|
|
|
|
+ process.exit(1);
|
|
|
|
+ }
|
|
|
|
+ logger.info(`nobody monitoring global lock registry, taken over now`);
|
|
|
|
+ });
|
|
|
|
+ } else {
|
|
|
|
+ logger = loggers.replica;
|
|
|
|
+ }
|
|
this.subscriber.psubscribe(`twitter:lock/*`, err => {
|
|
this.subscriber.psubscribe(`twitter:lock/*`, err => {
|
|
if (err) return logger.error(`failed to subscribe to active process locks, error: ${err}`);
|
|
if (err) return logger.error(`failed to subscribe to active process locks, error: ${err}`);
|
|
logger.debug(`monitoring all active locks`);
|
|
logger.debug(`monitoring all active locks`);
|
|
@@ -82,7 +93,7 @@ export default class {
|
|
});
|
|
});
|
|
this.subscriptions = {};
|
|
this.subscriptions = {};
|
|
this.expireAfter = opt.redisExpireTime;
|
|
this.expireAfter = opt.redisExpireTime;
|
|
- logger.info(`loaded redis service at ${opt.redisHost}:${opt.redisPort}`);
|
|
|
|
|
|
+ loggers.main.info(`loaded redis service at ${opt.redisHost}:${opt.redisPort}`);
|
|
}
|
|
}
|
|
|
|
|
|
private chatAsString = (chat: IChat) => `${chat.chatType}:${chat.chatID.toString()}`;
|
|
private chatAsString = (chat: IChat) => `${chat.chatType}:${chat.chatID.toString()}`;
|
|
@@ -93,9 +104,9 @@ export default class {
|
|
err ? reject(err) : resolve(res)
|
|
err ? reject(err) : resolve(res)
|
|
)
|
|
)
|
|
).then(res => {
|
|
).then(res => {
|
|
- logger.debug(`cached content ${contentId}, result: ${res}`);
|
|
|
|
|
|
+ loggers.cacher.debug(`cached content ${contentId}, result: ${res}`);
|
|
}).catch((err: Error) => {
|
|
}).catch((err: Error) => {
|
|
- logger.error(`failed to cache content ${contentId}, error: ${err}`);
|
|
|
|
|
|
+ loggers.cacher.error(`failed to cache content ${contentId}, error: ${err}`);
|
|
});
|
|
});
|
|
|
|
|
|
public cacheForChat = (postId: string, target: IChat) => {
|
|
public cacheForChat = (postId: string, target: IChat) => {
|
|
@@ -105,9 +116,9 @@ export default class {
|
|
err ? reject(err) : resolve(res)
|
|
err ? reject(err) : resolve(res)
|
|
)
|
|
)
|
|
).then(res => {
|
|
).then(res => {
|
|
- logger.debug(`cached post ${postId} for ${targetStr}, result: ${res}`);
|
|
|
|
|
|
+ loggers.cacher.debug(`cached post ${postId} for ${targetStr}, result: ${res}`);
|
|
}).catch((err: Error) => {
|
|
}).catch((err: Error) => {
|
|
- logger.error(`failed to cache post ${postId} for ${targetStr}, error: ${err}`);
|
|
|
|
|
|
+ loggers.cacher.error(`failed to cache post ${postId} for ${targetStr}, error: ${err}`);
|
|
});
|
|
});
|
|
};
|
|
};
|
|
|
|
|
|
@@ -115,14 +126,15 @@ export default class {
|
|
new Promise<string>((resolve, reject) =>
|
|
new Promise<string>((resolve, reject) =>
|
|
this.client.get(`content/${contentId}`, (err, res) => err ? reject(err) : resolve(res))
|
|
this.client.get(`content/${contentId}`, (err, res) => err ? reject(err) : resolve(res))
|
|
).then(res => {
|
|
).then(res => {
|
|
- logger.debug(`retrieved cached content ${contentId}, result: ${res}`);
|
|
|
|
|
|
+ loggers.cacher.debug(`retrieved cached content ${contentId}, result: ${res}`);
|
|
return res;
|
|
return res;
|
|
}).catch((err: Error) => {
|
|
}).catch((err: Error) => {
|
|
- logger.error(`failed to retrieve cached content ${contentId}, error: ${err}`);
|
|
|
|
|
|
+ loggers.cacher.error(`failed to retrieve cached content ${contentId}, error: ${err}`);
|
|
throw err;
|
|
throw err;
|
|
});
|
|
});
|
|
|
|
|
|
public startProcess = (processId: string) => {
|
|
public startProcess = (processId: string) => {
|
|
|
|
+ const logger = loggers.agent;
|
|
this.client.publish(`twitter:locks`, `WIP:${processId}`, (err, res) => {
|
|
this.client.publish(`twitter:locks`, `WIP:${processId}`, (err, res) => {
|
|
if (err) {
|
|
if (err) {
|
|
return logger.error(`error notifying subscription client about WIP:${processId}, result: ${res}`);
|
|
return logger.error(`error notifying subscription client about WIP:${processId}, result: ${res}`);
|
|
@@ -139,6 +151,7 @@ export default class {
|
|
}
|
|
}
|
|
|
|
|
|
public waitForProcess = (processId: string, timeout: number) => {
|
|
public waitForProcess = (processId: string, timeout: number) => {
|
|
|
|
+ const logger = loggers.agent;
|
|
if (!(processId in this.subscriptions)) {
|
|
if (!(processId in this.subscriptions)) {
|
|
logger.debug(`creating new waiting function for ${processId}...`);
|
|
logger.debug(`creating new waiting function for ${processId}...`);
|
|
let timeoutHandle: ReturnType<typeof setTimeout>;
|
|
let timeoutHandle: ReturnType<typeof setTimeout>;
|
|
@@ -200,10 +213,10 @@ export default class {
|
|
return new Promise<number>((resolve, reject) =>
|
|
return new Promise<number>((resolve, reject) =>
|
|
this.client.exists(`sent/${targetStr}/${postId}`, (err, res) => err ? reject(err) : resolve(res))
|
|
this.client.exists(`sent/${targetStr}/${postId}`, (err, res) => err ? reject(err) : resolve(res))
|
|
).then(res => {
|
|
).then(res => {
|
|
- logger.debug(`retrieved status of post ${postId} for ${targetStr}, result: ${res}`);
|
|
|
|
|
|
+ loggers.cacher.debug(`retrieved status of post ${postId} for ${targetStr}, result: ${res}`);
|
|
return Boolean(res);
|
|
return Boolean(res);
|
|
}).catch((err: Error) => {
|
|
}).catch((err: Error) => {
|
|
- logger.error(`failed to retrieve status of post ${postId} for ${targetStr}, error: ${err}`);
|
|
|
|
|
|
+ loggers.cacher.error(`failed to retrieve status of post ${postId} for ${targetStr}, error: ${err}`);
|
|
return false;
|
|
return false;
|
|
});
|
|
});
|
|
};
|
|
};
|