Add message queue tests for Redis

This commit is contained in:
Half-Shot 2022-05-11 16:11:38 +01:00
parent abce225524
commit ca117b04e6
6 changed files with 97 additions and 248 deletions

View File

@ -1,57 +0,0 @@
import { EventEmitter } from "events";
import { MessageQueue, MessageQueueMessage, DEFAULT_RES_TIMEOUT } from "./Types";
import micromatch from "micromatch";
import {v4 as uuid} from "uuid";
import Metrics from "../Metrics";
export class LocalMQ extends EventEmitter implements MessageQueue {
private subs: Set<string>;
constructor() {
super();
this.subs = new Set();
}
public subscribe(eventGlob: string) {
this.subs.add(eventGlob);
}
public unsubscribe(eventGlob: string) {
this.subs.delete(eventGlob);
}
public async push<T>(message: MessageQueueMessage<T>) {
Metrics.messageQueuePushes.inc({event: message.eventName});
if (!micromatch.match([...this.subs], message.eventName)) {
return;
}
if (!message.messageId) {
message.messageId = uuid();
}
this.emit(message.eventName, message);
}
public async pushWait<T, X>(message: MessageQueueMessage<T>,
timeout: number = DEFAULT_RES_TIMEOUT): Promise<X> {
let resolve: (value: X) => void;
let timer: NodeJS.Timer;
const p = new Promise<X>((res, rej) => {
resolve = res;
timer = setTimeout(() => {
rej(new Error(`Timeout waiting for message queue response for ${message.eventName} / ${message.messageId}`));
}, timeout);
});
const awaitResponse = (response: MessageQueueMessage<X>) => {
if (response.messageId === message.messageId) {
clearTimeout(timer);
this.removeListener(`response.${message.eventName}`, awaitResponse);
resolve(response.data);
}
};
this.addListener(`response.${message.eventName}`, awaitResponse);
this.push(message);
return p;
}
}

View File

@ -1,17 +0,0 @@
import { BridgeConfigQueue } from "../Config/Config";
import { LocalMQ } from "./LocalMQ";
import { RedisMQ } from "./RedisQueue";
import { MessageQueue } from "./Types";
const staticLocalMq = new LocalMQ();
let staticRedisMq: RedisMQ|null = null;
export function createMessageQueue(config: BridgeConfigQueue): MessageQueue {
if (config.monolithic) {
return staticLocalMq;
}
if (staticRedisMq === null) {
staticRedisMq = new RedisMQ(config);
}
return staticRedisMq;
}

View File

@ -1,119 +0,0 @@
import { MessageQueue, MessageQueueMessage, DEFAULT_RES_TIMEOUT, MessageQueueMessageOut } from "./Types";
import { Redis, default as redis } from "ioredis";
import { BridgeConfig, BridgeConfigQueue } from "../Config/Config";
import { EventEmitter } from "events";
import LogWrapper from "../LogWrapper";
import {v4 as uuid} from "uuid";
const log = new LogWrapper("RedisMq");
const CONSUMER_TRACK_PREFIX = "consumers.";
export class RedisMQ extends EventEmitter implements MessageQueue {
private static removePartsFromEventName(evName: string, partCount: number) {
return evName.split(".").slice(0, -partCount).join(".");
}
private redisSub: Redis;
private redisPub: Redis;
private redis: Redis;
private myUuid: string;
constructor(config: BridgeConfigQueue) {
super();
this.redisSub = new redis(config.port, config.host);
this.redisPub = new redis(config.port, config.host);
this.redis = new redis(config.port, config.host);
this.myUuid = uuid();
this.redisSub.on("pmessage", (_: string, channel: string, message: string) => {
const msg = JSON.parse(message) as MessageQueueMessageOut<unknown>;
if (msg.for && msg.for !== this.myUuid) {
log.debug(`Got message for ${msg.for}, dropping`);
return;
}
const delay = (process.hrtime()[1]) - msg.ts;
log.debug("Delay: ", delay / 1000000, "ms");
this.emit(channel, JSON.parse(message));
});
}
public subscribe(eventGlob: string) {
this.redisSub.psubscribe(eventGlob);
const consumerName = eventGlob.endsWith("*") ? RedisMQ.removePartsFromEventName(eventGlob, 1) : eventGlob;
this.redis.sadd(`${CONSUMER_TRACK_PREFIX}${consumerName}`, this.myUuid);
}
public unsubscribe(eventGlob: string) {
this.redisSub.punsubscribe(eventGlob);
this.redis.srem(`${CONSUMER_TRACK_PREFIX}${eventGlob}`, this.myUuid);
}
public async push<T>(message: MessageQueueMessage<T>, single = false) {
if (!message.messageId) {
message.messageId = uuid();
}
if (single) {
const recipient = await this.getRecipientForEvent(message.eventName);
if (!recipient) {
throw Error("Cannot find recipient for event");
}
message.for = recipient;
}
const outMsg: MessageQueueMessageOut<T> = {
...message,
ts: process.hrtime()[1],
}
try {
await this.redisPub.publish(message.eventName, JSON.stringify(outMsg));
log.debug(`Pushed ${message.eventName}`);
} catch (ex) {
log.warn("Failed to push an event:", ex);
throw Error("Failed to push message into queue");
}
}
public async pushWait<T, X>(message: MessageQueueMessage<T>,
timeout: number = DEFAULT_RES_TIMEOUT): Promise<X> {
let resolve: (value: X) => void;
let timer: NodeJS.Timer;
const p = new Promise<X>((res, rej) => {
resolve = res;
timer = setTimeout(() => {
rej(new Error("Timeout waiting for message queue response"));
}, timeout);
});
const awaitResponse = (response: MessageQueueMessage<X>) => {
if (response.messageId === message.messageId) {
clearTimeout(timer);
this.removeListener(`response.${message.eventName}`, awaitResponse);
resolve(response.data);
}
};
this.addListener(`response.${message.eventName}`, awaitResponse);
await this.push(message);
return p;
}
public stop() {
this.redisPub.disconnect();
this.redisSub.disconnect();
}
private async getRecipientForEvent(eventName: string): Promise<string|null> {
let recipient = null;
let parts = 0;
const totalParts = eventName.split(".").length;
// Work backwards from the event name.
while (recipient === null && parts < totalParts) {
const evName = RedisMQ.removePartsFromEventName(eventName, parts);
recipient = await this.redis.srandmember(evName) || null;
parts++;
}
return recipient;
}
}

View File

@ -1,2 +1,19 @@
export * from "./Types";
export * from "./MessageQueue";
import { BridgeConfigQueue } from "../Config/Config";
import { MonolithMessageQueue } from "./monolithMessageQueue";
import { RedisMessageQueue } from "./redisMessageQueue";
import { MessageQueue } from "./Types";
const staticLocalMq = new MonolithMessageQueue();
let staticRedisMq: RedisMessageQueue|null = null;
export function createMessageQueue(config: BridgeConfigQueue): MessageQueue {
if (config.monolithic) {
return staticLocalMq;
}
if (staticRedisMq === null) {
staticRedisMq = new RedisMessageQueue(config);
}
return staticRedisMq;
}

View File

@ -0,0 +1,79 @@
import { expect } from "chai";
import { createMessageQueue, MessageQueue, MessageQueueMessageOut } from "../../src/MessageQueue";
let mq: MessageQueue;
const sharedTests: ([string, () => Promise<void>])[] = [
["should be able to push an event, and listen for it", async () => {
mq.subscribe("fakeevent");
const msgPromise = new Promise<MessageQueueMessageOut<unknown>>(r => mq.on("fakeevent", (msg) => r(msg)));
const p = mq.push<number>({
sender: "foo",
eventName: "fakeevent",
messageId: "foooo",
data: 51,
});
const msg = await msgPromise;
expect(msg.ts).to.be.greaterThan(0);
expect(msg.sender).to.deep.equal('foo');
expect(msg.eventName).to.deep.equal('fakeevent');
expect(msg.messageId).to.deep.equal('foooo');
expect(msg.data).to.deep.equal(51);
await p;
}],
["should be able to push an event, and respond to it", async () => {
mq.subscribe("fakeevent2");
mq.subscribe("response.fakeevent2");
const msgPromise = new Promise<MessageQueueMessageOut<unknown>>(r => mq.on("fakeevent2", (msg) => r(msg)));
const response = mq.pushWait<number, string>({
sender: "foo",
eventName: "fakeevent2",
messageId: "foooo",
data: 49,
});
const msg = await msgPromise;
expect(msg.ts).to.be.greaterThan(0);
expect(msg.sender).to.deep.equal('foo');
expect(msg.eventName).to.deep.equal('fakeevent2');
expect(msg.messageId).to.deep.equal('foooo');
expect(msg.data).to.deep.equal(49);
await mq.push<string>({
sender: "foo",
eventName: "response.fakeevent2",
messageId: "foooo",
data: "worked",
});
expect(await response).to.equal("worked");
}],
]
describe("MessageQueue", () => {
describe("MonolithMessageQueue", () => {
beforeEach(() => {
mq = createMessageQueue({
monolithic: true
});
})
for (const test of sharedTests) {
it(test[0], test[1]);
}
});
const describeFn = process.env.TEST_REDIS_QUEUE_HOST ? describe : xdescribe;
describeFn("RedisMessageQueue", () => {
before(() => {
mq = createMessageQueue({
monolithic: false,
host: process.env.TEST_REDIS_QUEUE_HOST,
});
})
after(() => {
mq.stop?.();
})
for (const test of sharedTests) {
it(test[0], test[1]);
}
});
});

View File

@ -1,54 +0,0 @@
import { expect } from "chai";
import { createMessageQueue } from "../src/MessageQueue/MessageQueue";
const mq = createMessageQueue({
monolithic: true,
});
describe("MessageQueueTest", () => {
describe("LocalMq", () => {
it("should be able to push an event, and listen for it", async (done) => {
mq.subscribe("fakeevent");
mq.on("fakeevent", (msg) => {
expect(msg).to.deep.equal({
sender: "foo",
eventName: "fakeevent",
messageId: "foooo",
data: 51,
});
done();
});
await mq.push<number>({
sender: "foo",
eventName: "fakeevent",
messageId: "foooo",
data: 51,
});
});
it("should be able to push an event, and respond to it", async () => {
mq.subscribe("fakeevent2");
mq.subscribe("response.fakeevent2");
mq.on("fakeevent2", async (msg) => {
expect(msg).to.deep.equal({
sender: "foo",
eventName: "fakeevent2",
messageId: "foooo",
data: 49,
});
await mq.push<string>({
sender: "foo",
eventName: "response.fakeevent2",
messageId: "foooo",
data: "worked",
});
});
const response = await mq.pushWait<number, string>({
sender: "foo",
eventName: "fakeevent2",
messageId: "foooo",
data: 49,
});
expect(response).to.equal("worked");
});
});
});