diff --git a/src/GithubBridge.ts b/src/GithubBridge.ts index ddee7794..c034c319 100644 --- a/src/GithubBridge.ts +++ b/src/GithubBridge.ts @@ -138,7 +138,7 @@ export class GithubBridge { this.queue.on("oauth.response", async (msg) => { const adminRoom = [...this.adminRooms.values()].find((r) => r.oauthState === msg.data.state); - this.queue.push({ + await this.queue.push({ data: !!(adminRoom), sender: "GithubBridge", messageId: msg.messageId, @@ -549,7 +549,7 @@ export class GithubBridge { const token = await this.tokenStore.getUserToken(adminRoom.userId); if (token) { log.info(`Notifications enabled for ${adminRoom.userId} and token was found`); - this.queue.push({ + await this.queue.push({ eventName: "notifications.user.enable", sender: "GithubBridge", data: { @@ -564,7 +564,7 @@ export class GithubBridge { log.warn(`Notifications enabled for ${adminRoom.userId} but no token stored!`); } } else { - this.queue.push({ + await this.queue.push({ eventName: "notifications.user.disable", sender: "GithubBridge", data: { diff --git a/src/GithubWebhooks.ts b/src/GithubWebhooks.ts index 14bd82ec..0d371549 100644 --- a/src/GithubWebhooks.ts +++ b/src/GithubWebhooks.ts @@ -115,6 +115,8 @@ export class GithubWebhooks extends EventEmitter { eventName, sender: "GithubWebhooks", data: body, + }).catch((err) => { + log.info(`Failed to emit payload: ${err}`); }); } } catch (ex) { @@ -143,7 +145,7 @@ export class GithubWebhooks extends EventEmitter { state: req.query.state, })}`); const result = qs.parse(accessTokenRes.data) as { access_token: string, token_type: string }; - this.queue.push({ + await this.queue.push({ eventName: "oauth.tokens", sender: "GithubWebhooks", data: { state: req.query.state, ... result }, diff --git a/src/MatrixSender.ts b/src/MatrixSender.ts index a795ae7c..66e1f0d8 100644 --- a/src/MatrixSender.ts +++ b/src/MatrixSender.ts @@ -50,7 +50,7 @@ export class MatrixSender { const intent = msg.sender ? this.as.getIntentForUserId(msg.sender) : this.as.botIntent; const eventId = await intent.underlyingClient.sendEvent(msg.roomId, msg.type, msg.content); log.info("Sent", eventId); - this.mq.push({ + await this.mq.push({ eventName: "response.matrix.message", sender: "MatrixSender", data: { diff --git a/src/MessageQueue/LocalMQ.ts b/src/MessageQueue/LocalMQ.ts index 84e138c8..eb44792f 100644 --- a/src/MessageQueue/LocalMQ.ts +++ b/src/MessageQueue/LocalMQ.ts @@ -18,7 +18,7 @@ export class LocalMQ extends EventEmitter implements MessageQueue { this.subs.delete(eventGlob); } - public push(message: MessageQueueMessage) { + public async push(message: MessageQueueMessage) { if (!micromatch.match([...this.subs], message.eventName)) { return; } diff --git a/src/MessageQueue/MessageQueue.ts b/src/MessageQueue/MessageQueue.ts index 58748d8f..2b7a3302 100644 --- a/src/MessageQueue/MessageQueue.ts +++ b/src/MessageQueue/MessageQueue.ts @@ -13,13 +13,14 @@ export interface MessageQueueMessage { data: T; ts?: number; messageId?: string; + for?: string; } export interface MessageQueue { subscribe: (eventGlob: string) => void; unsubscribe: (eventGlob: string) => void; - push: (data: MessageQueueMessage) => void; - pushWait: (data: MessageQueueMessage) => Promise; + push: (data: MessageQueueMessage, single?: boolean) => Promise; + pushWait: (data: MessageQueueMessage, timeout?: number, single?: boolean) => Promise; on: (eventName: string, cb: (data: MessageQueueMessage) => void) => void; stop(): void; } diff --git a/src/MessageQueue/RedisQueue.ts b/src/MessageQueue/RedisQueue.ts index 1f6f8bb3..3becbd7b 100644 --- a/src/MessageQueue/RedisQueue.ts +++ b/src/MessageQueue/RedisQueue.ts @@ -7,15 +7,30 @@ import uuid from "uuid/v4"; const log = new LogWrapper("RedisMq"); +const CONSUMER_TRACK_PREFIX = "consumers."; + export class RedisMQ extends EventEmitter implements MessageQueue { + + private static removePartsFromEventName(evName: string, partCount: number) { + return evName.split(".").slice(0, -partCount).join("."); + } + private redisSub: Redis; private redisPub: Redis; + private redis: Redis; + private myUuid: string; constructor(config: BridgeConfig) { super(); this.redisSub = new redis(config.queue.port, config.queue.host); this.redisPub = new redis(config.queue.port, config.queue.host); + this.redis = new redis(config.queue.port, config.queue.host); + this.myUuid = uuid(); this.redisSub.on("pmessage", (pattern: string, channel: string, message: string) => { - const msg = JSON.parse(message); + const msg = JSON.parse(message) as MessageQueueMessage; + if (msg.for && msg.for !== this.myUuid) { + log.debug(`Got message for ${msg.for}, dropping`); + return; + } const delay = (process.hrtime()[1]) - msg.ts!; log.debug("Delay: ", delay / 1000000, "ms"); this.emit(channel, JSON.parse(message)); @@ -24,26 +39,39 @@ export class RedisMQ extends EventEmitter implements MessageQueue { public subscribe(eventGlob: string) { this.redisSub.psubscribe(eventGlob); + const consumerName = eventGlob.endsWith("*") ? RedisMQ.removePartsFromEventName(eventGlob, 1) : eventGlob; + this.redis.sadd(`${CONSUMER_TRACK_PREFIX}${consumerName}`, this.myUuid); } public unsubscribe(eventGlob: string) { this.redisSub.punsubscribe(eventGlob); + this.redis.srem(`${CONSUMER_TRACK_PREFIX}${eventGlob}`, this.myUuid); } - public push(message: MessageQueueMessage) { + public async push(message: MessageQueueMessage, single: boolean = false) { if (!message.messageId) { message.messageId = uuid(); } + if (single) { + const recipient = await this.getRecipientForEvent(message.eventName); + if (!recipient) { + throw Error("Cannot find recipient for event"); + } + message.for = recipient; + } message.ts = process.hrtime()[1]; - this.redisPub.publish(message.eventName, JSON.stringify(message)).then(() => { + try { + await this.redisPub.publish(message.eventName, JSON.stringify(message)); log.debug(`Pushed ${message.eventName}`); - }).catch((ex) => { + } catch (ex) { log.warn("Failed to push an event:", ex); - }); + throw Error("Failed to push message into queue"); + } } public async pushWait(message: MessageQueueMessage, - timeout: number = DEFAULT_RES_TIMEOUT): Promise { + timeout: number = DEFAULT_RES_TIMEOUT, + single: boolean = false): Promise { let awaitResponse: (response: MessageQueueMessage) => void; let resolve: (value: X) => void; let timer: NodeJS.Timer; @@ -64,7 +92,7 @@ export class RedisMQ extends EventEmitter implements MessageQueue { }; this.addListener(`response.${message.eventName}`, awaitResponse); - this.push(message); + await this.push(message); return p; } @@ -72,4 +100,17 @@ export class RedisMQ extends EventEmitter implements MessageQueue { this.redisPub.disconnect(); this.redisSub.disconnect(); } + + private async getRecipientForEvent(eventName: string): Promise { + let recipient = null; + let parts = 0; + const totalParts = eventName.split(".").length; + // Work backwards from the event name. + while (recipient === null && parts < totalParts) { + const evName = RedisMQ.removePartsFromEventName(eventName, parts); + recipient = await this.redis.srandmember(evName) || null; + parts++; + } + return recipient; + } } diff --git a/src/UserNotificationWatcher.ts b/src/UserNotificationWatcher.ts index 39721663..c493bb9c 100644 --- a/src/UserNotificationWatcher.ts +++ b/src/UserNotificationWatcher.ts @@ -87,7 +87,7 @@ export class UserNotificationWatcher { } } - this.queue.push({ + await this.queue.push({ eventName: "notifications.user.events", data: { roomId: stream.roomId, diff --git a/tests/MessageQueueTest.ts b/tests/MessageQueueTest.ts index 5e196f5f..9bf8e53c 100644 --- a/tests/MessageQueueTest.ts +++ b/tests/MessageQueueTest.ts @@ -10,7 +10,7 @@ const mq = createMessageQueue({ describe("MessageQueueTest", () => { describe("LocalMq", () => { - it("should be able to push an event, and listen for it", (done) => { + it("should be able to push an event, and listen for it", async (done) => { mq.subscribe("fakeevent"); mq.on("fakeevent", (msg) => { expect(msg).to.deep.equal({ @@ -21,7 +21,7 @@ describe("MessageQueueTest", () => { }); done(); }); - mq.push({ + await mq.push({ sender: "foo", eventName: "fakeevent", messageId: "foooo", @@ -31,14 +31,14 @@ describe("MessageQueueTest", () => { it("should be able to push an event, and respond to it", async () => { mq.subscribe("fakeevent2"); mq.subscribe("response.fakeevent2"); - mq.on("fakeevent2", (msg) => { + mq.on("fakeevent2", async (msg) => { expect(msg).to.deep.equal({ sender: "foo", eventName: "fakeevent2", messageId: "foooo", data: 49, }); - mq.push({ + await mq.push({ sender: "foo", eventName: "response.fakeevent2", messageId: "foooo",