diff --git a/src/MessageQueue/LocalMQ.ts b/src/MessageQueue/LocalMQ.ts deleted file mode 100644 index 31fe57f7..00000000 --- a/src/MessageQueue/LocalMQ.ts +++ /dev/null @@ -1,57 +0,0 @@ -import { EventEmitter } from "events"; -import { MessageQueue, MessageQueueMessage, DEFAULT_RES_TIMEOUT } from "./Types"; -import micromatch from "micromatch"; -import {v4 as uuid} from "uuid"; -import Metrics from "../Metrics"; - -export class LocalMQ extends EventEmitter implements MessageQueue { - private subs: Set; - constructor() { - super(); - this.subs = new Set(); - } - - public subscribe(eventGlob: string) { - this.subs.add(eventGlob); - } - - public unsubscribe(eventGlob: string) { - this.subs.delete(eventGlob); - } - - public async push(message: MessageQueueMessage) { - Metrics.messageQueuePushes.inc({event: message.eventName}); - if (!micromatch.match([...this.subs], message.eventName)) { - return; - } - if (!message.messageId) { - message.messageId = uuid(); - } - this.emit(message.eventName, message); - } - - public async pushWait(message: MessageQueueMessage, - timeout: number = DEFAULT_RES_TIMEOUT): Promise { - let resolve: (value: X) => void; - let timer: NodeJS.Timer; - - const p = new Promise((res, rej) => { - resolve = res; - timer = setTimeout(() => { - rej(new Error(`Timeout waiting for message queue response for ${message.eventName} / ${message.messageId}`)); - }, timeout); - }); - - const awaitResponse = (response: MessageQueueMessage) => { - if (response.messageId === message.messageId) { - clearTimeout(timer); - this.removeListener(`response.${message.eventName}`, awaitResponse); - resolve(response.data); - } - }; - - this.addListener(`response.${message.eventName}`, awaitResponse); - this.push(message); - return p; - } -} diff --git a/src/MessageQueue/MessageQueue.ts b/src/MessageQueue/MessageQueue.ts deleted file mode 100644 index 41003256..00000000 --- a/src/MessageQueue/MessageQueue.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { BridgeConfigQueue } from "../Config/Config"; -import { LocalMQ } from "./LocalMQ"; -import { RedisMQ } from "./RedisQueue"; -import { MessageQueue } from "./Types"; - -const staticLocalMq = new LocalMQ(); -let staticRedisMq: RedisMQ|null = null; - -export function createMessageQueue(config: BridgeConfigQueue): MessageQueue { - if (config.monolithic) { - return staticLocalMq; - } - if (staticRedisMq === null) { - staticRedisMq = new RedisMQ(config); - } - return staticRedisMq; -} diff --git a/src/MessageQueue/RedisQueue.ts b/src/MessageQueue/RedisQueue.ts deleted file mode 100644 index 107cf25d..00000000 --- a/src/MessageQueue/RedisQueue.ts +++ /dev/null @@ -1,119 +0,0 @@ - -import { MessageQueue, MessageQueueMessage, DEFAULT_RES_TIMEOUT, MessageQueueMessageOut } from "./Types"; -import { Redis, default as redis } from "ioredis"; -import { BridgeConfig, BridgeConfigQueue } from "../Config/Config"; -import { EventEmitter } from "events"; -import LogWrapper from "../LogWrapper"; - -import {v4 as uuid} from "uuid"; - -const log = new LogWrapper("RedisMq"); - -const CONSUMER_TRACK_PREFIX = "consumers."; - -export class RedisMQ extends EventEmitter implements MessageQueue { - - private static removePartsFromEventName(evName: string, partCount: number) { - return evName.split(".").slice(0, -partCount).join("."); - } - - private redisSub: Redis; - private redisPub: Redis; - private redis: Redis; - private myUuid: string; - constructor(config: BridgeConfigQueue) { - super(); - this.redisSub = new redis(config.port, config.host); - this.redisPub = new redis(config.port, config.host); - this.redis = new redis(config.port, config.host); - this.myUuid = uuid(); - this.redisSub.on("pmessage", (_: string, channel: string, message: string) => { - const msg = JSON.parse(message) as MessageQueueMessageOut; - if (msg.for && msg.for !== this.myUuid) { - log.debug(`Got message for ${msg.for}, dropping`); - return; - } - const delay = (process.hrtime()[1]) - msg.ts; - log.debug("Delay: ", delay / 1000000, "ms"); - this.emit(channel, JSON.parse(message)); - }); - } - - public subscribe(eventGlob: string) { - this.redisSub.psubscribe(eventGlob); - const consumerName = eventGlob.endsWith("*") ? RedisMQ.removePartsFromEventName(eventGlob, 1) : eventGlob; - this.redis.sadd(`${CONSUMER_TRACK_PREFIX}${consumerName}`, this.myUuid); - } - - public unsubscribe(eventGlob: string) { - this.redisSub.punsubscribe(eventGlob); - this.redis.srem(`${CONSUMER_TRACK_PREFIX}${eventGlob}`, this.myUuid); - } - - public async push(message: MessageQueueMessage, single = false) { - if (!message.messageId) { - message.messageId = uuid(); - } - if (single) { - const recipient = await this.getRecipientForEvent(message.eventName); - if (!recipient) { - throw Error("Cannot find recipient for event"); - } - message.for = recipient; - } - const outMsg: MessageQueueMessageOut = { - ...message, - ts: process.hrtime()[1], - } - try { - await this.redisPub.publish(message.eventName, JSON.stringify(outMsg)); - log.debug(`Pushed ${message.eventName}`); - } catch (ex) { - log.warn("Failed to push an event:", ex); - throw Error("Failed to push message into queue"); - } - } - - public async pushWait(message: MessageQueueMessage, - timeout: number = DEFAULT_RES_TIMEOUT): Promise { - let resolve: (value: X) => void; - let timer: NodeJS.Timer; - - const p = new Promise((res, rej) => { - resolve = res; - timer = setTimeout(() => { - rej(new Error("Timeout waiting for message queue response")); - }, timeout); - }); - - const awaitResponse = (response: MessageQueueMessage) => { - if (response.messageId === message.messageId) { - clearTimeout(timer); - this.removeListener(`response.${message.eventName}`, awaitResponse); - resolve(response.data); - } - }; - - this.addListener(`response.${message.eventName}`, awaitResponse); - await this.push(message); - return p; - } - - public stop() { - this.redisPub.disconnect(); - this.redisSub.disconnect(); - } - - private async getRecipientForEvent(eventName: string): Promise { - let recipient = null; - let parts = 0; - const totalParts = eventName.split(".").length; - // Work backwards from the event name. - while (recipient === null && parts < totalParts) { - const evName = RedisMQ.removePartsFromEventName(eventName, parts); - recipient = await this.redis.srandmember(evName) || null; - parts++; - } - return recipient; - } -} diff --git a/src/MessageQueue/index.ts b/src/MessageQueue/index.ts index f0118fa9..635676ee 100644 --- a/src/MessageQueue/index.ts +++ b/src/MessageQueue/index.ts @@ -1,2 +1,19 @@ export * from "./Types"; -export * from "./MessageQueue"; \ No newline at end of file + +import { BridgeConfigQueue } from "../Config/Config"; +import { MonolithMessageQueue } from "./monolithMessageQueue"; +import { RedisMessageQueue } from "./redisMessageQueue"; +import { MessageQueue } from "./Types"; + +const staticLocalMq = new MonolithMessageQueue(); +let staticRedisMq: RedisMessageQueue|null = null; + +export function createMessageQueue(config: BridgeConfigQueue): MessageQueue { + if (config.monolithic) { + return staticLocalMq; + } + if (staticRedisMq === null) { + staticRedisMq = new RedisMessageQueue(config); + } + return staticRedisMq; +} diff --git a/tests/MessageQueue/sharedTests.ts b/tests/MessageQueue/sharedTests.ts new file mode 100644 index 00000000..bf72354e --- /dev/null +++ b/tests/MessageQueue/sharedTests.ts @@ -0,0 +1,79 @@ +import { expect } from "chai"; +import { createMessageQueue, MessageQueue, MessageQueueMessageOut } from "../../src/MessageQueue"; + +let mq: MessageQueue; + +const sharedTests: ([string, () => Promise])[] = [ + ["should be able to push an event, and listen for it", async () => { + mq.subscribe("fakeevent"); + const msgPromise = new Promise>(r => mq.on("fakeevent", (msg) => r(msg))); + const p = mq.push({ + sender: "foo", + eventName: "fakeevent", + messageId: "foooo", + data: 51, + }); + const msg = await msgPromise; + expect(msg.ts).to.be.greaterThan(0); + expect(msg.sender).to.deep.equal('foo'); + expect(msg.eventName).to.deep.equal('fakeevent'); + expect(msg.messageId).to.deep.equal('foooo'); + expect(msg.data).to.deep.equal(51); + await p; + }], + ["should be able to push an event, and respond to it", async () => { + mq.subscribe("fakeevent2"); + mq.subscribe("response.fakeevent2"); + const msgPromise = new Promise>(r => mq.on("fakeevent2", (msg) => r(msg))); + const response = mq.pushWait({ + sender: "foo", + eventName: "fakeevent2", + messageId: "foooo", + data: 49, + }); + + const msg = await msgPromise; + expect(msg.ts).to.be.greaterThan(0); + expect(msg.sender).to.deep.equal('foo'); + expect(msg.eventName).to.deep.equal('fakeevent2'); + expect(msg.messageId).to.deep.equal('foooo'); + expect(msg.data).to.deep.equal(49); + await mq.push({ + sender: "foo", + eventName: "response.fakeevent2", + messageId: "foooo", + data: "worked", + }); + + expect(await response).to.equal("worked"); + }], +] + +describe("MessageQueue", () => { + describe("MonolithMessageQueue", () => { + beforeEach(() => { + mq = createMessageQueue({ + monolithic: true + }); + }) + for (const test of sharedTests) { + it(test[0], test[1]); + } + }); + + const describeFn = process.env.TEST_REDIS_QUEUE_HOST ? describe : xdescribe; + describeFn("RedisMessageQueue", () => { + before(() => { + mq = createMessageQueue({ + monolithic: false, + host: process.env.TEST_REDIS_QUEUE_HOST, + }); + }) + after(() => { + mq.stop?.(); + }) + for (const test of sharedTests) { + it(test[0], test[1]); + } + }); +}); diff --git a/tests/MessageQueueTest.ts b/tests/MessageQueueTest.ts deleted file mode 100644 index fc3a3433..00000000 --- a/tests/MessageQueueTest.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { expect } from "chai"; -import { createMessageQueue } from "../src/MessageQueue/MessageQueue"; - -const mq = createMessageQueue({ - monolithic: true, -}); - -describe("MessageQueueTest", () => { - describe("LocalMq", () => { - it("should be able to push an event, and listen for it", async (done) => { - mq.subscribe("fakeevent"); - mq.on("fakeevent", (msg) => { - expect(msg).to.deep.equal({ - sender: "foo", - eventName: "fakeevent", - messageId: "foooo", - data: 51, - }); - done(); - }); - await mq.push({ - sender: "foo", - eventName: "fakeevent", - messageId: "foooo", - data: 51, - }); - }); - it("should be able to push an event, and respond to it", async () => { - mq.subscribe("fakeevent2"); - mq.subscribe("response.fakeevent2"); - mq.on("fakeevent2", async (msg) => { - expect(msg).to.deep.equal({ - sender: "foo", - eventName: "fakeevent2", - messageId: "foooo", - data: 49, - }); - await mq.push({ - sender: "foo", - eventName: "response.fakeevent2", - messageId: "foooo", - data: "worked", - }); - }); - const response = await mq.pushWait({ - sender: "foo", - eventName: "fakeevent2", - messageId: "foooo", - data: 49, - }); - expect(response).to.equal("worked"); - }); - }); -});