Add support for multiprocessing

This commit is contained in:
Half-Shot 2019-08-05 15:11:42 +01:00
parent 3a376322e2
commit fea2bd7f65
8 changed files with 143 additions and 10 deletions

View File

@ -14,14 +14,18 @@
"dependencies": {
"@octokit/rest": "^16.28.7",
"@types/express": "^4.17.0",
"@types/ioredis": "^4.0.13",
"@types/markdown-it": "^0.0.8",
"express": "^4.17.1",
"ioredis": "^4.14.0",
"markdown-it": "^9.0.1",
"matrix-bot-sdk": "^0.3.9",
"micromatch": "^4.0.2",
"winston": "^3.2.1",
"yaml": "^1.6.0"
},
"devDependencies": {
"@types/micromatch": "^3.1.0",
"@types/node": "^12.6.9",
"@types/yaml": "^1.0.2",
"typescript": "^3.5.3"

View File

@ -0,0 +1,20 @@
import { parseConfig } from "../Config";
import { GithubWebhooks } from "../GithubWebhooks";
class GithubWebhookApp {
constructor () {
}
public async start() {
const configFile = process.argv[2] || "./config.yml";
const config = await parseConfig(configFile);
const webhookHandler = new GithubWebhooks(config);
webhookHandler.listen();
}
}
new GithubWebhookApp().start().catch((ex) => {
console.error("Bridge encountered an error and has stopped:", ex);
});

View File

@ -20,6 +20,11 @@ export interface BridgeConfig {
port: number,
bindAddress: string
},
queue: {
monolithic: boolean,
port?: number,
host?: string,
}
}
export async function parseRegistrationFile(filename: string) {
@ -29,5 +34,9 @@ export async function parseRegistrationFile(filename: string) {
export async function parseConfig(filename: string) {
const file = await fs.readFile(filename, "utf-8");
return YAML.parse(file) as BridgeConfig;
const config = YAML.parse(file) as BridgeConfig;
config.queue = config.queue || {
monolithic: true,
};
return config;
}

View File

@ -3,6 +3,7 @@ import { Application, default as express, Request, Response } from "express";
import { createHmac } from "crypto";
import { IssuesGetResponse, ReposGetResponse, IssuesGetResponseUser, IssuesGetCommentResponse } from "@octokit/rest";
import { EventEmitter } from "events";
import { MessageQueue, createMessageQueue } from "./MessageQueue/MessageQueue";
export interface IWebhookEvent {
action: string;
@ -14,6 +15,7 @@ export interface IWebhookEvent {
export class GithubWebhooks extends EventEmitter {
private expressApp: Application;
private queue: MessageQueue;
constructor(private config: BridgeConfig) {
super();
this.expressApp = express();
@ -21,6 +23,7 @@ export class GithubWebhooks extends EventEmitter {
verify: this.verifyRequest.bind(this),
}));
this.expressApp.post("/", this.onPayload.bind(this));
this.queue = createMessageQueue(config);
}
listen() {
@ -34,11 +37,12 @@ export class GithubWebhooks extends EventEmitter {
const body = req.body as IWebhookEvent;
console.log("Got", body.action);
try {
console.log(body);
if (body.action === "created" && body.comment) {
this.emit(`comment.created`, body);
} else {
this.emit(`${body.action}`, body);
this.queue.push({
eventName: "comment.created",
sender: "GithubWebhooks",
data: body,
})
}
} catch (ex) {
console.error("Failed to emit");

View File

@ -0,0 +1,26 @@
import { EventEmitter } from "events";
import { MessageQueue, MessageQueueMessage } from "./MessageQueue";
import micromatch from "micromatch";
export class LocalMQ extends EventEmitter implements MessageQueue {
private subs: Set<string>;
constructor() {
super();
this.subs = new Set();
}
public subscribe(eventGlob: string) {
this.subs.add(eventGlob);
}
public unsubscribe(eventGlob: string) {
this.subs.delete(eventGlob);
}
public push(message: MessageQueueMessage) {
if (!micromatch.match([...this.subs], message.eventName)) {
return;
}
this.emit(message.eventName, message);
}
}

View File

@ -0,0 +1,30 @@
import { BridgeConfig } from "../Config";
import { LocalMQ } from "./LocalMQ";
import { RedisMQ } from "./RedisQueue";
let staticLocalMq = new LocalMQ();
let staticRedisMq: RedisMQ|null = null;
export interface MessageQueueMessage {
sender: string,
eventName: string,
data: any,
ts?: number,
}
export interface MessageQueue {
subscribe: (eventGlob: string) => void,
unsubscribe: (eventGlob: string) => void,
push: (data: MessageQueueMessage) => void,
on: (eventName: string, cb: (data: MessageQueueMessage) => void) => void
}
export function createMessageQueue(config: BridgeConfig): MessageQueue {
if (config.queue.monolithic) {
return staticLocalMq;
}
if (staticRedisMq === null) {
staticRedisMq = new RedisMQ(config);
}
return staticRedisMq;
}

View File

@ -0,0 +1,31 @@
import { MessageQueue, MessageQueueMessage } from "./MessageQueue";
import { Redis, default as redis } from "ioredis";
import { BridgeConfig } from "../Config";
import { EventEmitter } from "events";
export class RedisMQ extends EventEmitter implements MessageQueue {
private redis: Redis;
constructor(config: BridgeConfig) {
super();
this.redis = new redis(config.queue.port, config.queue.host);
this.redis.on("pmessage", (_pattern: string, channel: string, message: string) => {
const msg = JSON.parse(message);
const delay = (process.hrtime()[1]) - msg.ts!;
console.log("Delay: ", delay / 1000000, "ms");
this.emit(channel, JSON.parse(message));
});
}
public subscribe (eventGlob: string) {
this.redis.psubscribe(eventGlob);
}
public unsubscribe (eventGlob: string) {
this.redis.punsubscribe(eventGlob);
}
public push (data: MessageQueueMessage) {
data.ts = process.hrtime()[1];
this.redis.publish(data.eventName, JSON.stringify(data));
}
}

View File

@ -6,6 +6,7 @@ import { IBridgeRoomState, BRIDGE_STATE_TYPE } from "./BridgeState";
import { BridgeConfig, parseConfig, parseRegistrationFile } from "./Config";
import { GithubWebhooks, IWebhookEvent } from "./GithubWebhooks";
import { CommentProcessor } from "./CommentProcessor";
import { MessageQueue, createMessageQueue, MessageQueueMessage } from "./MessageQueue/MessageQueue";
const md = new markdown();
@ -26,8 +27,8 @@ export class GithubBridge {
private roomIdtoBridgeState: Map<string, IBridgeRoomState[]>;
private orgRepoIssueToRoomId: Map<string, string>;
private matrixHandledEvents: Set<string>;
private webhookHandler?: GithubWebhooks;
private commentProcessor!: CommentProcessor;
private queue!: MessageQueue;
constructor () {
this.roomIdtoBridgeState = new Map();
@ -39,6 +40,9 @@ export class GithubBridge {
const configFile = process.argv[2] || "./config.yml";
const registrationFile = process.argv[3] || "./registration.yml";
this.config = await parseConfig(configFile);
this.queue = createMessageQueue(this.config);
const registration = await parseRegistrationFile(registrationFile);
this.octokit = new Octokit({
auth: this.config.github.auth,
@ -62,12 +66,17 @@ export class GithubBridge {
this.onRoomEvent(roomId, event);
});
if (this.config.github.webhook) {
this.webhookHandler = new GithubWebhooks(this.config);
this.webhookHandler.listen();
this.webhookHandler.on("comment.created", this.onCommentCreated.bind(this));
if (this.config.github.webhook && this.config.queue.monolithic) {
const webhookHandler = new GithubWebhooks(this.config);
webhookHandler.listen();
}
this.queue.subscribe("comment.*");
this.queue.on("comment.created", (msg: MessageQueueMessage) => {
this.onCommentCreated(msg.data);
});
// Fetch all room state
const joinedRooms = await this.as.botIntent.underlyingClient.getJoinedRooms();
for (const roomId of joinedRooms) {