Try to implement the ability for the MQ to target one worker

This commit is contained in:
Half-Shot 2020-02-25 13:25:21 +00:00
parent daa366b148
commit af907ce134
8 changed files with 64 additions and 20 deletions

View File

@ -138,7 +138,7 @@ export class GithubBridge {
this.queue.on<IOAuthRequest>("oauth.response", async (msg) => { this.queue.on<IOAuthRequest>("oauth.response", async (msg) => {
const adminRoom = [...this.adminRooms.values()].find((r) => r.oauthState === msg.data.state); const adminRoom = [...this.adminRooms.values()].find((r) => r.oauthState === msg.data.state);
this.queue.push<boolean>({ await this.queue.push<boolean>({
data: !!(adminRoom), data: !!(adminRoom),
sender: "GithubBridge", sender: "GithubBridge",
messageId: msg.messageId, messageId: msg.messageId,
@ -549,7 +549,7 @@ export class GithubBridge {
const token = await this.tokenStore.getUserToken(adminRoom.userId); const token = await this.tokenStore.getUserToken(adminRoom.userId);
if (token) { if (token) {
log.info(`Notifications enabled for ${adminRoom.userId} and token was found`); log.info(`Notifications enabled for ${adminRoom.userId} and token was found`);
this.queue.push<NotificationsEnableEvent>({ await this.queue.push<NotificationsEnableEvent>({
eventName: "notifications.user.enable", eventName: "notifications.user.enable",
sender: "GithubBridge", sender: "GithubBridge",
data: { data: {
@ -564,7 +564,7 @@ export class GithubBridge {
log.warn(`Notifications enabled for ${adminRoom.userId} but no token stored!`); log.warn(`Notifications enabled for ${adminRoom.userId} but no token stored!`);
} }
} else { } else {
this.queue.push<NotificationsDisableEvent>({ await this.queue.push<NotificationsDisableEvent>({
eventName: "notifications.user.disable", eventName: "notifications.user.disable",
sender: "GithubBridge", sender: "GithubBridge",
data: { data: {

View File

@ -115,6 +115,8 @@ export class GithubWebhooks extends EventEmitter {
eventName, eventName,
sender: "GithubWebhooks", sender: "GithubWebhooks",
data: body, data: body,
}).catch((err) => {
log.info(`Failed to emit payload: ${err}`);
}); });
} }
} catch (ex) { } catch (ex) {
@ -143,7 +145,7 @@ export class GithubWebhooks extends EventEmitter {
state: req.query.state, state: req.query.state,
})}`); })}`);
const result = qs.parse(accessTokenRes.data) as { access_token: string, token_type: string }; const result = qs.parse(accessTokenRes.data) as { access_token: string, token_type: string };
this.queue.push<IOAuthTokens>({ await this.queue.push<IOAuthTokens>({
eventName: "oauth.tokens", eventName: "oauth.tokens",
sender: "GithubWebhooks", sender: "GithubWebhooks",
data: { state: req.query.state, ... result }, data: { state: req.query.state, ... result },

View File

@ -50,7 +50,7 @@ export class MatrixSender {
const intent = msg.sender ? this.as.getIntentForUserId(msg.sender) : this.as.botIntent; const intent = msg.sender ? this.as.getIntentForUserId(msg.sender) : this.as.botIntent;
const eventId = await intent.underlyingClient.sendEvent(msg.roomId, msg.type, msg.content); const eventId = await intent.underlyingClient.sendEvent(msg.roomId, msg.type, msg.content);
log.info("Sent", eventId); log.info("Sent", eventId);
this.mq.push<IMatrixSendMessageResponse>({ await this.mq.push<IMatrixSendMessageResponse>({
eventName: "response.matrix.message", eventName: "response.matrix.message",
sender: "MatrixSender", sender: "MatrixSender",
data: { data: {

View File

@ -18,7 +18,7 @@ export class LocalMQ extends EventEmitter implements MessageQueue {
this.subs.delete(eventGlob); this.subs.delete(eventGlob);
} }
public push<T>(message: MessageQueueMessage<T>) { public async push<T>(message: MessageQueueMessage<T>) {
if (!micromatch.match([...this.subs], message.eventName)) { if (!micromatch.match([...this.subs], message.eventName)) {
return; return;
} }

View File

@ -13,13 +13,14 @@ export interface MessageQueueMessage<T> {
data: T; data: T;
ts?: number; ts?: number;
messageId?: string; messageId?: string;
for?: string;
} }
export interface MessageQueue { export interface MessageQueue {
subscribe: (eventGlob: string) => void; subscribe: (eventGlob: string) => void;
unsubscribe: (eventGlob: string) => void; unsubscribe: (eventGlob: string) => void;
push: <T>(data: MessageQueueMessage<T>) => void; push: <T>(data: MessageQueueMessage<T>, single?: boolean) => Promise<void>;
pushWait: <T, X>(data: MessageQueueMessage<T>) => Promise<X>; pushWait: <T, X>(data: MessageQueueMessage<T>, timeout?: number, single?: boolean) => Promise<X>;
on: <T>(eventName: string, cb: (data: MessageQueueMessage<T>) => void) => void; on: <T>(eventName: string, cb: (data: MessageQueueMessage<T>) => void) => void;
stop(): void; stop(): void;
} }

View File

@ -7,15 +7,30 @@ import uuid from "uuid/v4";
const log = new LogWrapper("RedisMq"); const log = new LogWrapper("RedisMq");
const CONSUMER_TRACK_PREFIX = "consumers.";
export class RedisMQ extends EventEmitter implements MessageQueue { export class RedisMQ extends EventEmitter implements MessageQueue {
private static removePartsFromEventName(evName: string, partCount: number) {
return evName.split(".").slice(0, -partCount).join(".");
}
private redisSub: Redis; private redisSub: Redis;
private redisPub: Redis; private redisPub: Redis;
private redis: Redis;
private myUuid: string;
constructor(config: BridgeConfig) { constructor(config: BridgeConfig) {
super(); super();
this.redisSub = new redis(config.queue.port, config.queue.host); this.redisSub = new redis(config.queue.port, config.queue.host);
this.redisPub = new redis(config.queue.port, config.queue.host); this.redisPub = new redis(config.queue.port, config.queue.host);
this.redis = new redis(config.queue.port, config.queue.host);
this.myUuid = uuid();
this.redisSub.on("pmessage", (pattern: string, channel: string, message: string) => { this.redisSub.on("pmessage", (pattern: string, channel: string, message: string) => {
const msg = JSON.parse(message); const msg = JSON.parse(message) as MessageQueueMessage<unknown>;
if (msg.for && msg.for !== this.myUuid) {
log.debug(`Got message for ${msg.for}, dropping`);
return;
}
const delay = (process.hrtime()[1]) - msg.ts!; const delay = (process.hrtime()[1]) - msg.ts!;
log.debug("Delay: ", delay / 1000000, "ms"); log.debug("Delay: ", delay / 1000000, "ms");
this.emit(channel, JSON.parse(message)); this.emit(channel, JSON.parse(message));
@ -24,26 +39,39 @@ export class RedisMQ extends EventEmitter implements MessageQueue {
public subscribe(eventGlob: string) { public subscribe(eventGlob: string) {
this.redisSub.psubscribe(eventGlob); this.redisSub.psubscribe(eventGlob);
const consumerName = eventGlob.endsWith("*") ? RedisMQ.removePartsFromEventName(eventGlob, 1) : eventGlob;
this.redis.sadd(`${CONSUMER_TRACK_PREFIX}${consumerName}`, this.myUuid);
} }
public unsubscribe(eventGlob: string) { public unsubscribe(eventGlob: string) {
this.redisSub.punsubscribe(eventGlob); this.redisSub.punsubscribe(eventGlob);
this.redis.srem(`${CONSUMER_TRACK_PREFIX}${eventGlob}`, this.myUuid);
} }
public push<T>(message: MessageQueueMessage<T>) { public async push<T>(message: MessageQueueMessage<T>, single: boolean = false) {
if (!message.messageId) { if (!message.messageId) {
message.messageId = uuid(); message.messageId = uuid();
} }
if (single) {
const recipient = await this.getRecipientForEvent(message.eventName);
if (!recipient) {
throw Error("Cannot find recipient for event");
}
message.for = recipient;
}
message.ts = process.hrtime()[1]; message.ts = process.hrtime()[1];
this.redisPub.publish(message.eventName, JSON.stringify(message)).then(() => { try {
await this.redisPub.publish(message.eventName, JSON.stringify(message));
log.debug(`Pushed ${message.eventName}`); log.debug(`Pushed ${message.eventName}`);
}).catch((ex) => { } catch (ex) {
log.warn("Failed to push an event:", ex); log.warn("Failed to push an event:", ex);
}); throw Error("Failed to push message into queue");
}
} }
public async pushWait<T, X>(message: MessageQueueMessage<T>, public async pushWait<T, X>(message: MessageQueueMessage<T>,
timeout: number = DEFAULT_RES_TIMEOUT): Promise<X> { timeout: number = DEFAULT_RES_TIMEOUT,
single: boolean = false): Promise<X> {
let awaitResponse: (response: MessageQueueMessage<X>) => void; let awaitResponse: (response: MessageQueueMessage<X>) => void;
let resolve: (value: X) => void; let resolve: (value: X) => void;
let timer: NodeJS.Timer; let timer: NodeJS.Timer;
@ -64,7 +92,7 @@ export class RedisMQ extends EventEmitter implements MessageQueue {
}; };
this.addListener(`response.${message.eventName}`, awaitResponse); this.addListener(`response.${message.eventName}`, awaitResponse);
this.push(message); await this.push(message);
return p; return p;
} }
@ -72,4 +100,17 @@ export class RedisMQ extends EventEmitter implements MessageQueue {
this.redisPub.disconnect(); this.redisPub.disconnect();
this.redisSub.disconnect(); this.redisSub.disconnect();
} }
private async getRecipientForEvent(eventName: string): Promise<string|null> {
let recipient = null;
let parts = 0;
const totalParts = eventName.split(".").length;
// Work backwards from the event name.
while (recipient === null && parts < totalParts) {
const evName = RedisMQ.removePartsFromEventName(eventName, parts);
recipient = await this.redis.srandmember(evName) || null;
parts++;
}
return recipient;
}
} }

View File

@ -87,7 +87,7 @@ export class UserNotificationWatcher {
} }
} }
this.queue.push<UserNotificationsEvent>({ await this.queue.push<UserNotificationsEvent>({
eventName: "notifications.user.events", eventName: "notifications.user.events",
data: { data: {
roomId: stream.roomId, roomId: stream.roomId,

View File

@ -10,7 +10,7 @@ const mq = createMessageQueue({
describe("MessageQueueTest", () => { describe("MessageQueueTest", () => {
describe("LocalMq", () => { describe("LocalMq", () => {
it("should be able to push an event, and listen for it", (done) => { it("should be able to push an event, and listen for it", async (done) => {
mq.subscribe("fakeevent"); mq.subscribe("fakeevent");
mq.on("fakeevent", (msg) => { mq.on("fakeevent", (msg) => {
expect(msg).to.deep.equal({ expect(msg).to.deep.equal({
@ -21,7 +21,7 @@ describe("MessageQueueTest", () => {
}); });
done(); done();
}); });
mq.push<number>({ await mq.push<number>({
sender: "foo", sender: "foo",
eventName: "fakeevent", eventName: "fakeevent",
messageId: "foooo", messageId: "foooo",
@ -31,14 +31,14 @@ describe("MessageQueueTest", () => {
it("should be able to push an event, and respond to it", async () => { it("should be able to push an event, and respond to it", async () => {
mq.subscribe("fakeevent2"); mq.subscribe("fakeevent2");
mq.subscribe("response.fakeevent2"); mq.subscribe("response.fakeevent2");
mq.on("fakeevent2", (msg) => { mq.on("fakeevent2", async (msg) => {
expect(msg).to.deep.equal({ expect(msg).to.deep.equal({
sender: "foo", sender: "foo",
eventName: "fakeevent2", eventName: "fakeevent2",
messageId: "foooo", messageId: "foooo",
data: 49, data: 49,
}); });
mq.push<string>({ await mq.push<string>({
sender: "foo", sender: "foo",
eventName: "response.fakeevent2", eventName: "response.fakeevent2",
messageId: "foooo", messageId: "foooo",