From 85d12e8b19910e10a4fb7bcaae2c69c87b2318e2 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Sun, 2 Jan 2022 00:45:26 +0000 Subject: [PATCH] Rust files --- src/notifications/GitHubWatcher.ts | 147 +++++++++++++++++++ src/notifications/GitLabWatcher.ts | 36 +++++ src/notifications/NotificationWatcherTask.ts | 13 ++ src/notifications/UserNotificationWatcher.ts | 96 ++++++++++++ src/notifications/github_watcher.rs | 44 ++++++ src/notifications/mod.rs | 2 + src/notifications/types.rs | 5 + 7 files changed, 343 insertions(+) create mode 100644 src/notifications/GitHubWatcher.ts create mode 100644 src/notifications/GitLabWatcher.ts create mode 100644 src/notifications/NotificationWatcherTask.ts create mode 100644 src/notifications/UserNotificationWatcher.ts create mode 100644 src/notifications/github_watcher.rs create mode 100644 src/notifications/mod.rs create mode 100644 src/notifications/types.rs diff --git a/src/notifications/GitHubWatcher.ts b/src/notifications/GitHubWatcher.ts new file mode 100644 index 00000000..684a843e --- /dev/null +++ b/src/notifications/GitHubWatcher.ts @@ -0,0 +1,147 @@ +import { Octokit } from "@octokit/rest"; +import { EventEmitter } from "events"; +import { GithubInstance } from "../Github/GithubInstance"; +import LogWrapper from "../LogWrapper"; +import { NotificationWatcherTask } from "./NotificationWatcherTask"; +import { RequestError } from "@octokit/request-error"; +import { GitHubUserNotification } from "../Github/Types"; +import { OctokitResponse } from "@octokit/types"; +import Metrics from "../Metrics"; +const log = new LogWrapper("GitHubWatcher"); + +const GH_API_THRESHOLD = 50; +const GH_API_RETRY_IN = 1000 * 60; + +export class GitHubWatcher extends EventEmitter implements NotificationWatcherTask { + private static apiFailureCount = 0; + private static globalRetryIn = 0; + + public static checkGitHubStatus() { + this.apiFailureCount = Math.min(this.apiFailureCount + 1, GH_API_THRESHOLD); + if (this.apiFailureCount < GH_API_THRESHOLD) { + log.warn(`API Failure count at ${this.apiFailureCount}`); + return; + } + // The API is actively failing. + if (this.globalRetryIn > 0) { + this.globalRetryIn = Date.now() + GH_API_RETRY_IN; + } + log.warn(`API Failure limit reached, holding off new requests for ${GH_API_RETRY_IN / 1000}s`); + Metrics.notificationsServiceUp.set({service: "github"}, 0); + } + + private octoKit: Octokit; + public failureCount = 0; + private interval?: NodeJS.Timeout; + private lastReadTs = 0; + public readonly type = "github"; + public readonly instanceUrl = undefined; + + constructor(token: string, public userId: string, public roomId: string, since: number, private participating = false) { + super(); + this.octoKit = GithubInstance.createUserOctokit(token); + this.lastReadTs = since; + } + + public start(intervalMs: number) { + log.info(`Starting for ${this.userId}`); + this.interval = setInterval(() => { + this.getNotifications(); + }, intervalMs); + this.getNotifications(); + } + + public stop() { + if (this.interval) { + log.info(`Stopping for ${this.userId}`); + clearInterval(this.interval); + } + } + + private handleGitHubFailure(ex: RequestError) { + log.error("An error occured getting notifications:", ex); + if (ex.status === 401 || ex.status === 404) { + log.warn(`Got status ${ex.status} when handing user stream: ${ex.message}`); + this.failureCount++; + } else if (ex.status >= 500) { + setImmediate(() => GitHubWatcher.checkGitHubStatus()); + } + this.emit("fetch_failure", this); + } + + private async getNotifications() { + if (GitHubWatcher.globalRetryIn !== 0 && GitHubWatcher.globalRetryIn > Date.now()) { + log.info(`Not getting notifications for ${this.userId}, API is still down.`); + return; + } + log.debug(`Getting notifications for ${this.userId} ${this.lastReadTs}`); + const since = this.lastReadTs !== 0 ? `&since=${new Date(this.lastReadTs).toISOString()}`: ""; + let response: OctokitResponse; + try { + response = await this.octoKit.request( + `/notifications?participating=${this.participating}${since}`, + ); + Metrics.notificationsServiceUp.set({service: "github"}, 1); + // We were succesful, clear any timeouts. + GitHubWatcher.globalRetryIn = 0; + // To avoid a bouncing issue, gradually reduce the failure count. + GitHubWatcher.apiFailureCount = Math.max(0, GitHubWatcher.apiFailureCount - 2); + } catch (ex) { + await this.handleGitHubFailure(ex as RequestError); + return; + } + this.lastReadTs = Date.now(); + + if (response.data.length) { + log.info(`Got ${response.data.length} notifications for ${this.userId}`); + } + for (const rawEvent of response.data) { + try { + if (rawEvent.subject.url) { + const res = await this.octoKit.request(rawEvent.subject.url); + rawEvent.subject.url_data = res.data; + } + if (rawEvent.subject.latest_comment_url) { + const res = await this.octoKit.request(rawEvent.subject.latest_comment_url); + rawEvent.subject.latest_comment_url_data = res.data; + } + if (rawEvent.reason === "review_requested") { + if (!rawEvent.subject.url_data?.number) { + log.warn("review_requested was missing subject.url_data.number"); + continue; + } + if (!rawEvent.repository.owner) { + log.warn("review_requested was missing repository.owner"); + continue; + } + rawEvent.subject.requested_reviewers = (await this.octoKit.pulls.listRequestedReviewers({ + pull_number: rawEvent.subject.url_data.number, + owner: rawEvent.repository.owner.login, + repo: rawEvent.repository.name, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + })).data as any; + rawEvent.subject.reviews = (await this.octoKit.pulls.listReviews({ + pull_number: rawEvent.subject.url_data.number, + owner: rawEvent.repository.owner.login, + repo: rawEvent.repository.name, + })).data; + } + } catch (ex) { + log.warn(`Failed to pre-process ${rawEvent.id}: ${ex}`); + // We still push + } + log.debug(`Pushing ${rawEvent.id}`); + Metrics.notificationsPush.inc({service: "github"}); + this.emit("new_events", { + eventName: "notifications.user.events", + data: { + roomId: this.roomId, + events: [rawEvent], + lastReadTs: this.lastReadTs, + }, + sender: "GithubWebhooks", + }); + } + } + +} diff --git a/src/notifications/GitLabWatcher.ts b/src/notifications/GitLabWatcher.ts new file mode 100644 index 00000000..e14aa7f5 --- /dev/null +++ b/src/notifications/GitLabWatcher.ts @@ -0,0 +1,36 @@ +import { EventEmitter } from "events"; +import { GitLabClient } from "../Gitlab/Client"; +import LogWrapper from "../LogWrapper"; +import { NotificationWatcherTask } from "./NotificationWatcherTask"; + +const log = new LogWrapper("GitLabWatcher"); + +export class GitLabWatcher extends EventEmitter implements NotificationWatcherTask { + private client: GitLabClient; + private interval?: NodeJS.Timeout; + public readonly type = "gitlab"; + public failureCount = 0; + constructor(token: string, url: string, public userId: string, public roomId: string, public since: number) { + super(); + this.client = new GitLabClient(url, token); + } + + public start(intervalMs: number) { + this.interval = setInterval(() => { + this.getNotifications(); + }, intervalMs); + } + + public stop() { + if (this.interval) { + clearInterval(this.interval); + } + } + + private async getNotifications() { + log.info(`Fetching events from GitLab for ${this.userId}`); + const events = await this.client.getEvents({ + after: new Date(this.since) + }); + } +} \ No newline at end of file diff --git a/src/notifications/NotificationWatcherTask.ts b/src/notifications/NotificationWatcherTask.ts new file mode 100644 index 00000000..942d0490 --- /dev/null +++ b/src/notifications/NotificationWatcherTask.ts @@ -0,0 +1,13 @@ +import { EventEmitter } from "events"; + +type NotificationTypes = "github"|"gitlab"; + +export interface NotificationWatcherTask extends EventEmitter { + userId: string; + type: NotificationTypes; + instanceUrl?: string; + roomId: string; + failureCount: number; + start(intervalMs: number): void; + stop(): void; +} \ No newline at end of file diff --git a/src/notifications/UserNotificationWatcher.ts b/src/notifications/UserNotificationWatcher.ts new file mode 100644 index 00000000..cfe61793 --- /dev/null +++ b/src/notifications/UserNotificationWatcher.ts @@ -0,0 +1,96 @@ +import { NotificationsDisableEvent, NotificationsEnableEvent } from "../Webhooks"; +import LogWrapper from "../LogWrapper"; +import { createMessageQueue, MessageQueue, MessageQueueMessage } from "../MessageQueue"; +import { MessageSenderClient } from "../MatrixSender"; +import { NotificationWatcherTask } from "./NotificationWatcherTask"; +import { GitHubWatcher } from "./GitHubWatcher"; +import { GitHubUserNotification } from "../Github/Types"; +import { GitLabWatcher } from "./GitLabWatcher"; +import { BridgeConfig } from "../Config/Config"; +import Metrics from "../Metrics"; +export interface UserNotificationsEvent { + roomId: string; + lastReadTs: number; + events: GitHubUserNotification[]; +} + +const MIN_INTERVAL_MS = 15000; +const FAILURE_THRESHOLD = 50; + +const log = new LogWrapper("UserNotificationWatcher"); + +export class UserNotificationWatcher { + /* Key: userId:type:instanceUrl */ + private userIntervals = new Map(); + private matrixMessageSender: MessageSenderClient; + private queue: MessageQueue; + + constructor(config: BridgeConfig) { + this.queue = createMessageQueue(config); + this.matrixMessageSender = new MessageSenderClient(this.queue); + } + + private static constructMapKey(userId: string, type: "github"|"gitlab", instanceUrl?: string) { + return `${userId}:${type}:${instanceUrl || ""}`; + } + + public start() { + this.queue.subscribe("notifications.user.*"); + this.queue.on("notifications.user.enable", (msg: MessageQueueMessage) => { + this.addUser(msg.data); + }); + this.queue.on("notifications.user.disable", (msg: MessageQueueMessage) => { + this.removeUser(msg.data.userId, msg.data.type, msg.data.instanceUrl); + }); + } + + public stop() { + [...this.userIntervals.values()].forEach((v) => { + v.stop(); + }); + this.queue.stop ? this.queue.stop() : undefined; + } + + public removeUser(userId: string, type: "github"|"gitlab", instanceUrl?: string) { + const key = UserNotificationWatcher.constructMapKey(userId, type, instanceUrl); + const task = this.userIntervals.get(key); + if (task) { + task.stop(); + this.userIntervals.delete(key); + log.info(`Removed ${key} from the notif queue`); + } + Metrics.notificationsWatchers.set({service: type}, this.userIntervals.size); + } + + private onFetchFailure(task: NotificationWatcherTask) { + if (task.failureCount > FAILURE_THRESHOLD) { + this.removeUser(task.userId, task.type, task.instanceUrl); + this.matrixMessageSender.sendMatrixText( + task.roomId, +`The bridge has been unable to process your notification stream for some time, and has disabled notifications. +Check your token is still valid, and then turn notifications back on.`, "m.notice", + ); + } + } + + public addUser(data: NotificationsEnableEvent) { + let task: NotificationWatcherTask; + const key = UserNotificationWatcher.constructMapKey(data.userId, data.type, data.instanceUrl); + if (data.type === "github") { + task = new GitHubWatcher(data.token, data.userId, data.roomId, data.since, data.filterParticipating); + } else if (data.type === "gitlab" && data.instanceUrl) { + task = new GitLabWatcher(data.token, data.instanceUrl, data.userId, data.roomId, data.since); + } else { + throw Error('Notification type not known'); + } + this.userIntervals.get(key)?.stop(); + task.start(MIN_INTERVAL_MS); + task.on("fetch_failure", this.onFetchFailure.bind(this)); + task.on("new_events", (payload) => { + this.queue.push(payload); + }); + this.userIntervals.set(key, task); + Metrics.notificationsWatchers.set({service: data.type}, this.userIntervals.size); + log.info(`Inserted ${key} into the notif queue`); + } +} diff --git a/src/notifications/github_watcher.rs b/src/notifications/github_watcher.rs new file mode 100644 index 00000000..ad6a24a2 --- /dev/null +++ b/src/notifications/github_watcher.rs @@ -0,0 +1,44 @@ +use clokwerk::Scheduler; +use octorust::{auth::Credentials, Client}; +use super::types::NotificationWatcherTask; + +#[napi] +pub struct GitHubWatcher { + last_read_ts: u64, + since: i64, + user_id: String, + room_id: String, + participating: bool, + github: Client, +} + +#[napi] +impl GitHubWatcher { + #[napi(constructor)] + pub fn new(token: String, user_id: String, room_id: String, since: Option, participating: Option) -> Self { + GitHubWatcher { + last_read_ts: 0, + user_id: user_id, + room_id: room_id, + since: since.unwrap_or(0), + participating: participating.unwrap_or(false), + github: Client::new( + String::from("matrix-hookshot/1.0.0"), + Credentials::Token(token), + ) + } + } +} + +#[napi] +impl NotificationWatcherTask for GitHubWatcher { + + + fn start(interval_ms: usize, scheduler: &Scheduler) { + + } + + fn stop() { + todo!() + } +} diff --git a/src/notifications/mod.rs b/src/notifications/mod.rs new file mode 100644 index 00000000..c8549ca1 --- /dev/null +++ b/src/notifications/mod.rs @@ -0,0 +1,2 @@ +mod types; +mod github_watcher; diff --git a/src/notifications/types.rs b/src/notifications/types.rs new file mode 100644 index 00000000..91dffcf4 --- /dev/null +++ b/src/notifications/types.rs @@ -0,0 +1,5 @@ +use clokwerk::{Scheduler}; +pub trait NotificationWatcherTask { + fn start(interval_ms: usize, scheduler: &Scheduler); + fn stop(); +}