Add support for pushWait message passing

This commit is contained in:
Half-Shot 2019-08-08 14:38:31 +01:00
parent 6644d9f0a8
commit 665214e52f
5 changed files with 142 additions and 17 deletions

View File

@ -11,6 +11,7 @@
"build": "tsc --project tsconfig.json", "build": "tsc --project tsconfig.json",
"start:app": "node lib/App/BridgeApp.js", "start:app": "node lib/App/BridgeApp.js",
"start:webhooks": "node lib/App/GithubWebhookApp.js", "start:webhooks": "node lib/App/GithubWebhookApp.js",
"start:matrixsender": "node lib/App/MatrixSenderApp.js",
"test": "mocha -r ts-node/register tests/**/*.ts", "test": "mocha -r ts-node/register tests/**/*.ts",
"lint": "tslint -p tsconfig.json" "lint": "tslint -p tsconfig.json"
}, },
@ -25,6 +26,7 @@
"mocha": "^6.2.0", "mocha": "^6.2.0",
"node-emoji": "^1.10.0", "node-emoji": "^1.10.0",
"request-promise-native": "^1.0.7", "request-promise-native": "^1.0.7",
"uuid": "^3.3.2",
"winston": "^3.2.1", "winston": "^3.2.1",
"yaml": "^1.6.0" "yaml": "^1.6.0"
}, },
@ -39,6 +41,7 @@
"@types/node": "^12.6.9", "@types/node": "^12.6.9",
"@types/node-emoji": "^1.8.1", "@types/node-emoji": "^1.8.1",
"@types/request-promise-native": "^1.0.16", "@types/request-promise-native": "^1.0.16",
"@types/uuid": "^3.4.5",
"@types/yaml": "^1.0.2", "@types/yaml": "^1.0.2",
"chai": "^4.2.0", "chai": "^4.2.0",
"ts-node": "^8.3.0", "ts-node": "^8.3.0",

View File

@ -1,6 +1,7 @@
import { EventEmitter } from "events"; import { EventEmitter } from "events";
import { MessageQueue, MessageQueueMessage } from "./MessageQueue"; import { MessageQueue, MessageQueueMessage, DEFAULT_RES_TIMEOUT } from "./MessageQueue";
import micromatch from "micromatch"; import micromatch from "micromatch";
import uuid from "uuid/v4";
export class LocalMQ extends EventEmitter implements MessageQueue { export class LocalMQ extends EventEmitter implements MessageQueue {
private subs: Set<string>; private subs: Set<string>;
@ -17,10 +18,39 @@ export class LocalMQ extends EventEmitter implements MessageQueue {
this.subs.delete(eventGlob); this.subs.delete(eventGlob);
} }
public push(message: MessageQueueMessage) { public push<T>(message: MessageQueueMessage<T>) {
if (!micromatch.match([...this.subs], message.eventName)) { if (!micromatch.match([...this.subs], message.eventName)) {
return; return;
} }
if (!message.messageId) {
message.messageId = uuid();
}
this.emit(message.eventName, message); this.emit(message.eventName, message);
} }
public async pushWait<T, X>(message: MessageQueueMessage<T>,
timeout: number = DEFAULT_RES_TIMEOUT): Promise<X> {
let awaitResponse: (response: MessageQueueMessage<X>) => void;
let resolve: (value: X) => void;
let timer: NodeJS.Timer;
const p = new Promise<X>((res, rej) => {
resolve = res;
timer = setTimeout(() => {
rej(new Error("Timeout waiting for message queue response"));
}, timeout);
});
awaitResponse = (response: MessageQueueMessage<X>) => {
if (response.messageId === message.messageId) {
clearTimeout(timer);
this.removeListener(`response.${message.eventName}`, awaitResponse);
resolve(response.data);
}
};
this.addListener(`response.${message.eventName}`, awaitResponse);
this.push(message);
return p;
}
} }

View File

@ -2,22 +2,25 @@ import { BridgeConfig } from "../Config";
import { LocalMQ } from "./LocalMQ"; import { LocalMQ } from "./LocalMQ";
import { RedisMQ } from "./RedisQueue"; import { RedisMQ } from "./RedisQueue";
export const DEFAULT_RES_TIMEOUT = 30000;
const staticLocalMq = new LocalMQ(); const staticLocalMq = new LocalMQ();
let staticRedisMq: RedisMQ|null = null; let staticRedisMq: RedisMQ|null = null;
export interface MessageQueueMessage { export interface MessageQueueMessage<T> {
sender: string; sender: string;
eventName: string; eventName: string;
// tslint:disable-next-line: no-any data: T;
data: any;
ts?: number; ts?: number;
messageId?: string;
} }
export interface MessageQueue { export interface MessageQueue {
subscribe: (eventGlob: string) => void; subscribe: (eventGlob: string) => void;
unsubscribe: (eventGlob: string) => void; unsubscribe: (eventGlob: string) => void;
push: (data: MessageQueueMessage) => void; push: <T>(data: MessageQueueMessage<T>) => void;
on: (eventName: string, cb: (data: MessageQueueMessage) => void) => void; pushWait: <T, X>(data: MessageQueueMessage<T>) => Promise<X>;
on: <T>(eventName: string, cb: (data: MessageQueueMessage<T>) => void) => void;
} }
export function createMessageQueue(config: BridgeConfig): MessageQueue { export function createMessageQueue(config: BridgeConfig): MessageQueue {

View File

@ -1,17 +1,20 @@
import { MessageQueue, MessageQueueMessage } from "./MessageQueue"; import { MessageQueue, MessageQueueMessage, DEFAULT_RES_TIMEOUT } from "./MessageQueue";
import { Redis, default as redis } from "ioredis"; import { Redis, default as redis } from "ioredis";
import { BridgeConfig } from "../Config"; import { BridgeConfig } from "../Config";
import { EventEmitter } from "events"; import { EventEmitter } from "events";
import { LogWrapper } from "../LogWrapper"; import { LogWrapper } from "../LogWrapper";
import uuid from "uuid/v4";
const log = new LogWrapper("RedisMq"); const log = new LogWrapper("RedisMq");
export class RedisMQ extends EventEmitter implements MessageQueue { export class RedisMQ extends EventEmitter implements MessageQueue {
private redis: Redis; private redisSub: Redis;
private redisPub: Redis;
constructor(config: BridgeConfig) { constructor(config: BridgeConfig) {
super(); super();
this.redis = new redis(config.queue.port, config.queue.host); this.redisSub = new redis(config.queue.port, config.queue.host);
this.redis.on("pmessage", (pattern: string, channel: string, message: string) => { this.redisPub = new redis(config.queue.port, config.queue.host);
this.redisSub.on("pmessage", (pattern: string, channel: string, message: string) => {
const msg = JSON.parse(message); const msg = JSON.parse(message);
const delay = (process.hrtime()[1]) - msg.ts!; const delay = (process.hrtime()[1]) - msg.ts!;
log.debug("Delay: ", delay / 1000000, "ms"); log.debug("Delay: ", delay / 1000000, "ms");
@ -20,19 +23,48 @@ export class RedisMQ extends EventEmitter implements MessageQueue {
} }
public subscribe(eventGlob: string) { public subscribe(eventGlob: string) {
this.redis.psubscribe(eventGlob); this.redisSub.psubscribe(eventGlob);
} }
public unsubscribe(eventGlob: string) { public unsubscribe(eventGlob: string) {
this.redis.punsubscribe(eventGlob); this.redisSub.punsubscribe(eventGlob);
} }
public push(data: MessageQueueMessage) { public push<T>(message: MessageQueueMessage<T>) {
data.ts = process.hrtime()[1]; if (!message.messageId) {
this.redis.publish(data.eventName, JSON.stringify(data)).then(() => { message.messageId = uuid();
log.debug(`Pushed ${data.eventName}`); }
message.ts = process.hrtime()[1];
this.redisPub.publish(message.eventName, JSON.stringify(message)).then(() => {
log.debug(`Pushed ${message.eventName}`);
}).catch((ex) => { }).catch((ex) => {
log.warn("Failed to push an event:", ex); log.warn("Failed to push an event:", ex);
}); });
} }
public async pushWait<T, X>(message: MessageQueueMessage<T>,
timeout: number = DEFAULT_RES_TIMEOUT): Promise<X> {
let awaitResponse: (response: MessageQueueMessage<X>) => void;
let resolve: (value: X) => void;
let timer: NodeJS.Timer;
const p = new Promise<X>((res, rej) => {
resolve = res;
timer = setTimeout(() => {
rej(new Error("Timeout waiting for message queue response"));
}, timeout);
});
awaitResponse = (response: MessageQueueMessage<X>) => {
if (response.messageId === message.messageId) {
clearTimeout(timer);
this.removeListener(`response.${message.eventName}`, awaitResponse);
resolve(response.data);
}
};
this.addListener(`response.${message.eventName}`, awaitResponse);
this.push(message);
return p;
}
} }

57
tests/MessageQueueTest.ts Normal file
View File

@ -0,0 +1,57 @@
import { expect } from "chai";
import { createMessageQueue } from "../src/MessageQueue/MessageQueue";
const mq = createMessageQueue({
queue: {
monolithic: true,
},
// tslint:disable-next-line: no-any
} as any);
describe("MessageQueueTest", () => {
describe("LocalMq", () => {
it("should be able to push an event, and listen for it", (done) => {
mq.subscribe("fakeevent");
mq.on("fakeevent", (msg) => {
expect(msg).to.deep.equal({
sender: "foo",
eventName: "fakeevent",
messageId: "foooo",
data: 51,
});
done();
});
mq.push<number>({
sender: "foo",
eventName: "fakeevent",
messageId: "foooo",
data: 51,
});
});
it("should be able to push an event, and respond to it", async () => {
mq.subscribe("fakeevent2");
mq.subscribe("response.fakeevent2");
mq.on("fakeevent2", (msg) => {
expect(msg).to.deep.equal({
sender: "foo",
eventName: "fakeevent2",
messageId: "foooo",
data: 49,
});
mq.push<string>({
sender: "foo",
eventName: "response.fakeevent2",
messageId: "foooo",
data: "worked",
});
});
const response = await mq.pushWait<number, string>({
sender: "foo",
eventName: "fakeevent2",
messageId: "foooo",
data: 49,
});
expect(response).to.equal("worked");
});
});
});