Decrease connection creation concurrency when starting up (#614)

* Add a queue system for loading connections on startup

* Retry state requests

* Add ability to retry on some matrix failures

* Add the queue

* changelog

* Add docstring
This commit is contained in:
Will Hunt 2023-01-09 17:57:24 +00:00 committed by GitHub
parent 188eb4004e
commit a5ea88578c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 66 additions and 14 deletions

1
changelog.d/614.bugfix Normal file
View File

@ -0,0 +1 @@
Improve startup stability by not loading all room state at once.

View File

@ -60,6 +60,7 @@
"mime": "^3.0.0",
"node-emoji": "^1.11.0",
"nyc": "^15.1.0",
"p-queue": "^6.6.2",
"prom-client": "^14.0.1",
"reflect-metadata": "^0.1.13",
"rss-parser": "^3.12.0",

View File

@ -38,6 +38,7 @@ import { JiraOAuthRequestCloud, JiraOAuthRequestOnPrem, JiraOAuthRequestResult }
import { GenericWebhookEvent, GenericWebhookEventResult } from "./generic/types";
import { SetupWidget } from "./Widgets/SetupWidget";
import { FeedEntry, FeedError, FeedReader, FeedSuccess } from "./feeds/FeedReader";
import PQueue from "p-queue";
const log = new Logger("Bridge");
export class Bridge {
@ -653,8 +654,10 @@ export class Bridge {
await this.as.botClient.setDisplayName(this.config.bot.displayname);
}
}
await Promise.all(joinedRooms.map(async (roomId) => {
const queue = new PQueue({
concurrency: 2,
});
queue.addAll(joinedRooms.map(((roomId) => async () => {
log.debug("Fetching state for " + roomId);
try {
await connManager.createConnectionsForRoomId(roomId, false);
@ -703,7 +706,7 @@ export class Bridge {
} catch (ex) {
log.error(`Failed to set up admin room ${roomId}:`, ex);
}
}));
})));
// Handle spaces
for (const discussion of connManager.getAllConnectionsOfType(GitHubDiscussionSpace)) {
@ -733,6 +736,8 @@ export class Bridge {
if (this.config.metrics?.enabled) {
this.listener.bindResource('metrics', Metrics.expressRouter);
}
await queue.onIdle();
log.info(`All connections loaded`);
await this.as.begin();
log.info(`Bridge is now ready. Found ${this.connectionManager.size} connections`);
this.ready = true;

View File

@ -4,25 +4,29 @@
* Manages connections between Matrix rooms and the remote side.
*/
import { ApiError, ErrCode } from "./api";
import { Appservice, StateEvent } from "matrix-bot-sdk";
import { CommentProcessor } from "./CommentProcessor";
import { BridgeConfig, BridgePermissionLevel, GitLabInstance } from "./Config/Config";
import { CommentProcessor } from "./CommentProcessor";
import { ConnectionDeclaration, ConnectionDeclarations, GenericHookConnection, GitHubDiscussionConnection, GitHubDiscussionSpace, GitHubIssueConnection, GitHubProjectConnection, GitHubRepoConnection, GitHubUserSpace, GitLabIssueConnection, GitLabRepoConnection, IConnection, IConnectionState, JiraProjectConnection } from "./Connections";
import { GithubInstance } from "./Github/GithubInstance";
import { FigmaFileConnection, FeedConnection } from "./Connections";
import { GetConnectionTypeResponseItem } from "./provisioning/api";
import { GitLabClient } from "./Gitlab/Client";
import { GithubInstance } from "./Github/GithubInstance";
import { IBridgeStorageProvider } from "./Stores/StorageProvider";
import { JiraProject, JiraVersion } from "./Jira/Types";
import { Logger } from "matrix-appservice-bridge";
import { MessageSenderClient } from "./MatrixSender";
import { GetConnectionTypeResponseItem } from "./provisioning/api";
import { ApiError, ErrCode } from "./api";
import { UserTokenStore } from "./UserTokenStore";
import { FigmaFileConnection, FeedConnection } from "./Connections";
import { IBridgeStorageProvider } from "./Stores/StorageProvider";
import { retry, retryMatrixErrorFilter } from "./PromiseUtil";
import Metrics from "./Metrics";
import EventEmitter from "events";
const log = new Logger("ConnectionManager");
const GET_STATE_ATTEMPTS = 5;
const GET_STATE_TIMEOUT_MS = 1000;
export class ConnectionManager extends EventEmitter {
private connections: IConnection[] = [];
public readonly enabledForProvisioning: Record<string, GetConnectionTypeResponseItem> = {};
@ -172,7 +176,14 @@ export class ConnectionManager extends EventEmitter {
public async createConnectionsForRoomId(roomId: string, rollbackBadState: boolean) {
let connectionCreated = false;
const state = await this.as.botClient.getRoomState(roomId);
// This endpoint can be heavy, wrap it in pillows.
const state = await retry(
() => this.as.botClient.getRoomState(roomId),
GET_STATE_ATTEMPTS,
GET_STATE_TIMEOUT_MS,
retryMatrixErrorFilter
);
for (const event of state) {
try {
const conn = await this.createConnectionForState(roomId, new StateEvent(event), rollbackBadState);

View File

@ -1,21 +1,55 @@
import { StatusCodes } from "http-status-codes";
import { Logger } from "matrix-appservice-bridge";
import { MatrixError } from "matrix-bot-sdk";
const SLEEP_TIME_MS = 250;
const DEFAULT_RETRY = () => true;
const log = new Logger("PromiseUtil");
export async function retry<T>(actionFn: () => Promise<T>,
type RetryFn = (error: Error) => boolean|number;
/**
* Checks errors returned from a Matrix API request, and determines
* if the error should be retried.
* @param err An Error object, which may be a MatrixError
* @returns - `true` if the action should be reried.
* - A `number` if the action should be retried with a specific wait period.
* - `false` if the action should not be retried..
*/
export function retryMatrixErrorFilter(err: Error) {
if (err instanceof MatrixError && err.statusCode >= 400 && err.statusCode <= 499) {
if (err.statusCode === StatusCodes.TOO_MANY_REQUESTS) {
return err.retryAfterMs ?? true;
}
return false;
}
return true;
}
/**
* Runs a function, and retries it if the filter function permits it.
* @param actionFn The action to run
* @param maxAttempts The number of attempts to make before giving up.
* @param waitFor The number of milliseconds to wait between attempts. May be overrideb by filterFn.
* @param filterFn A function that checks the error on failure, and determines if the action should be retried. By default, this retries ALL failures.
* @returns The result of actionFn
* @throws If the `maxAttempts` limit is exceeded, or the `filterFn` returns false.
*/
export async function retry<T>(actionFn: () => T,
maxAttempts: number,
waitFor: number = SLEEP_TIME_MS,
filterFn: (err: unknown) => boolean = DEFAULT_RETRY): Promise<T> {
filterFn: RetryFn = DEFAULT_RETRY): Promise<T> {
let attempts = 0;
while (attempts < maxAttempts) {
attempts++;
try {
return await actionFn();
} catch (ex) {
if (filterFn(ex)) {
const timeMs = waitFor * Math.pow(2, attempts);
const shouldRetry = filterFn(ex);
if (shouldRetry) {
// If the filter returns a retry ms, use that.
const timeMs = typeof shouldRetry === "number" ?
shouldRetry : waitFor * Math.pow(2, attempts);
log.warn(`Action failed (${ex}), retrying in ${timeMs}ms`);
await new Promise((r) => setTimeout(r, timeMs));
} else {