Rust files

This commit is contained in:
Will Hunt 2022-01-02 00:45:26 +00:00
parent cb4f5eca98
commit 85d12e8b19
7 changed files with 343 additions and 0 deletions

View File

@ -0,0 +1,147 @@
import { Octokit } from "@octokit/rest";
import { EventEmitter } from "events";
import { GithubInstance } from "../Github/GithubInstance";
import LogWrapper from "../LogWrapper";
import { NotificationWatcherTask } from "./NotificationWatcherTask";
import { RequestError } from "@octokit/request-error";
import { GitHubUserNotification } from "../Github/Types";
import { OctokitResponse } from "@octokit/types";
import Metrics from "../Metrics";
const log = new LogWrapper("GitHubWatcher");
const GH_API_THRESHOLD = 50;
const GH_API_RETRY_IN = 1000 * 60;
export class GitHubWatcher extends EventEmitter implements NotificationWatcherTask {
private static apiFailureCount = 0;
private static globalRetryIn = 0;
public static checkGitHubStatus() {
this.apiFailureCount = Math.min(this.apiFailureCount + 1, GH_API_THRESHOLD);
if (this.apiFailureCount < GH_API_THRESHOLD) {
log.warn(`API Failure count at ${this.apiFailureCount}`);
return;
}
// The API is actively failing.
if (this.globalRetryIn > 0) {
this.globalRetryIn = Date.now() + GH_API_RETRY_IN;
}
log.warn(`API Failure limit reached, holding off new requests for ${GH_API_RETRY_IN / 1000}s`);
Metrics.notificationsServiceUp.set({service: "github"}, 0);
}
private octoKit: Octokit;
public failureCount = 0;
private interval?: NodeJS.Timeout;
private lastReadTs = 0;
public readonly type = "github";
public readonly instanceUrl = undefined;
constructor(token: string, public userId: string, public roomId: string, since: number, private participating = false) {
super();
this.octoKit = GithubInstance.createUserOctokit(token);
this.lastReadTs = since;
}
public start(intervalMs: number) {
log.info(`Starting for ${this.userId}`);
this.interval = setInterval(() => {
this.getNotifications();
}, intervalMs);
this.getNotifications();
}
public stop() {
if (this.interval) {
log.info(`Stopping for ${this.userId}`);
clearInterval(this.interval);
}
}
private handleGitHubFailure(ex: RequestError) {
log.error("An error occured getting notifications:", ex);
if (ex.status === 401 || ex.status === 404) {
log.warn(`Got status ${ex.status} when handing user stream: ${ex.message}`);
this.failureCount++;
} else if (ex.status >= 500) {
setImmediate(() => GitHubWatcher.checkGitHubStatus());
}
this.emit("fetch_failure", this);
}
private async getNotifications() {
if (GitHubWatcher.globalRetryIn !== 0 && GitHubWatcher.globalRetryIn > Date.now()) {
log.info(`Not getting notifications for ${this.userId}, API is still down.`);
return;
}
log.debug(`Getting notifications for ${this.userId} ${this.lastReadTs}`);
const since = this.lastReadTs !== 0 ? `&since=${new Date(this.lastReadTs).toISOString()}`: "";
let response: OctokitResponse<GitHubUserNotification[]>;
try {
response = await this.octoKit.request(
`/notifications?participating=${this.participating}${since}`,
);
Metrics.notificationsServiceUp.set({service: "github"}, 1);
// We were succesful, clear any timeouts.
GitHubWatcher.globalRetryIn = 0;
// To avoid a bouncing issue, gradually reduce the failure count.
GitHubWatcher.apiFailureCount = Math.max(0, GitHubWatcher.apiFailureCount - 2);
} catch (ex) {
await this.handleGitHubFailure(ex as RequestError);
return;
}
this.lastReadTs = Date.now();
if (response.data.length) {
log.info(`Got ${response.data.length} notifications for ${this.userId}`);
}
for (const rawEvent of response.data) {
try {
if (rawEvent.subject.url) {
const res = await this.octoKit.request(rawEvent.subject.url);
rawEvent.subject.url_data = res.data;
}
if (rawEvent.subject.latest_comment_url) {
const res = await this.octoKit.request(rawEvent.subject.latest_comment_url);
rawEvent.subject.latest_comment_url_data = res.data;
}
if (rawEvent.reason === "review_requested") {
if (!rawEvent.subject.url_data?.number) {
log.warn("review_requested was missing subject.url_data.number");
continue;
}
if (!rawEvent.repository.owner) {
log.warn("review_requested was missing repository.owner");
continue;
}
rawEvent.subject.requested_reviewers = (await this.octoKit.pulls.listRequestedReviewers({
pull_number: rawEvent.subject.url_data.number,
owner: rawEvent.repository.owner.login,
repo: rawEvent.repository.name,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
})).data as any;
rawEvent.subject.reviews = (await this.octoKit.pulls.listReviews({
pull_number: rawEvent.subject.url_data.number,
owner: rawEvent.repository.owner.login,
repo: rawEvent.repository.name,
})).data;
}
} catch (ex) {
log.warn(`Failed to pre-process ${rawEvent.id}: ${ex}`);
// We still push
}
log.debug(`Pushing ${rawEvent.id}`);
Metrics.notificationsPush.inc({service: "github"});
this.emit("new_events", {
eventName: "notifications.user.events",
data: {
roomId: this.roomId,
events: [rawEvent],
lastReadTs: this.lastReadTs,
},
sender: "GithubWebhooks",
});
}
}
}

View File

@ -0,0 +1,36 @@
import { EventEmitter } from "events";
import { GitLabClient } from "../Gitlab/Client";
import LogWrapper from "../LogWrapper";
import { NotificationWatcherTask } from "./NotificationWatcherTask";
const log = new LogWrapper("GitLabWatcher");
export class GitLabWatcher extends EventEmitter implements NotificationWatcherTask {
private client: GitLabClient;
private interval?: NodeJS.Timeout;
public readonly type = "gitlab";
public failureCount = 0;
constructor(token: string, url: string, public userId: string, public roomId: string, public since: number) {
super();
this.client = new GitLabClient(url, token);
}
public start(intervalMs: number) {
this.interval = setInterval(() => {
this.getNotifications();
}, intervalMs);
}
public stop() {
if (this.interval) {
clearInterval(this.interval);
}
}
private async getNotifications() {
log.info(`Fetching events from GitLab for ${this.userId}`);
const events = await this.client.getEvents({
after: new Date(this.since)
});
}
}

View File

@ -0,0 +1,13 @@
import { EventEmitter } from "events";
type NotificationTypes = "github"|"gitlab";
export interface NotificationWatcherTask extends EventEmitter {
userId: string;
type: NotificationTypes;
instanceUrl?: string;
roomId: string;
failureCount: number;
start(intervalMs: number): void;
stop(): void;
}

View File

@ -0,0 +1,96 @@
import { NotificationsDisableEvent, NotificationsEnableEvent } from "../Webhooks";
import LogWrapper from "../LogWrapper";
import { createMessageQueue, MessageQueue, MessageQueueMessage } from "../MessageQueue";
import { MessageSenderClient } from "../MatrixSender";
import { NotificationWatcherTask } from "./NotificationWatcherTask";
import { GitHubWatcher } from "./GitHubWatcher";
import { GitHubUserNotification } from "../Github/Types";
import { GitLabWatcher } from "./GitLabWatcher";
import { BridgeConfig } from "../Config/Config";
import Metrics from "../Metrics";
export interface UserNotificationsEvent {
roomId: string;
lastReadTs: number;
events: GitHubUserNotification[];
}
const MIN_INTERVAL_MS = 15000;
const FAILURE_THRESHOLD = 50;
const log = new LogWrapper("UserNotificationWatcher");
export class UserNotificationWatcher {
/* Key: userId:type:instanceUrl */
private userIntervals = new Map<string, NotificationWatcherTask>();
private matrixMessageSender: MessageSenderClient;
private queue: MessageQueue;
constructor(config: BridgeConfig) {
this.queue = createMessageQueue(config);
this.matrixMessageSender = new MessageSenderClient(this.queue);
}
private static constructMapKey(userId: string, type: "github"|"gitlab", instanceUrl?: string) {
return `${userId}:${type}:${instanceUrl || ""}`;
}
public start() {
this.queue.subscribe("notifications.user.*");
this.queue.on("notifications.user.enable", (msg: MessageQueueMessage<NotificationsEnableEvent>) => {
this.addUser(msg.data);
});
this.queue.on("notifications.user.disable", (msg: MessageQueueMessage<NotificationsDisableEvent>) => {
this.removeUser(msg.data.userId, msg.data.type, msg.data.instanceUrl);
});
}
public stop() {
[...this.userIntervals.values()].forEach((v) => {
v.stop();
});
this.queue.stop ? this.queue.stop() : undefined;
}
public removeUser(userId: string, type: "github"|"gitlab", instanceUrl?: string) {
const key = UserNotificationWatcher.constructMapKey(userId, type, instanceUrl);
const task = this.userIntervals.get(key);
if (task) {
task.stop();
this.userIntervals.delete(key);
log.info(`Removed ${key} from the notif queue`);
}
Metrics.notificationsWatchers.set({service: type}, this.userIntervals.size);
}
private onFetchFailure(task: NotificationWatcherTask) {
if (task.failureCount > FAILURE_THRESHOLD) {
this.removeUser(task.userId, task.type, task.instanceUrl);
this.matrixMessageSender.sendMatrixText(
task.roomId,
`The bridge has been unable to process your notification stream for some time, and has disabled notifications.
Check your token is still valid, and then turn notifications back on.`, "m.notice",
);
}
}
public addUser(data: NotificationsEnableEvent) {
let task: NotificationWatcherTask;
const key = UserNotificationWatcher.constructMapKey(data.userId, data.type, data.instanceUrl);
if (data.type === "github") {
task = new GitHubWatcher(data.token, data.userId, data.roomId, data.since, data.filterParticipating);
} else if (data.type === "gitlab" && data.instanceUrl) {
task = new GitLabWatcher(data.token, data.instanceUrl, data.userId, data.roomId, data.since);
} else {
throw Error('Notification type not known');
}
this.userIntervals.get(key)?.stop();
task.start(MIN_INTERVAL_MS);
task.on("fetch_failure", this.onFetchFailure.bind(this));
task.on("new_events", (payload) => {
this.queue.push<UserNotificationsEvent>(payload);
});
this.userIntervals.set(key, task);
Metrics.notificationsWatchers.set({service: data.type}, this.userIntervals.size);
log.info(`Inserted ${key} into the notif queue`);
}
}

View File

@ -0,0 +1,44 @@
use clokwerk::Scheduler;
use octorust::{auth::Credentials, Client};
use super::types::NotificationWatcherTask;
#[napi]
pub struct GitHubWatcher {
last_read_ts: u64,
since: i64,
user_id: String,
room_id: String,
participating: bool,
github: Client,
}
#[napi]
impl GitHubWatcher {
#[napi(constructor)]
pub fn new(token: String, user_id: String, room_id: String, since: Option<i64>, participating: Option<bool>) -> Self {
GitHubWatcher {
last_read_ts: 0,
user_id: user_id,
room_id: room_id,
since: since.unwrap_or(0),
participating: participating.unwrap_or(false),
github: Client::new(
String::from("matrix-hookshot/1.0.0"),
Credentials::Token(token),
)
}
}
}
#[napi]
impl NotificationWatcherTask for GitHubWatcher {
fn start(interval_ms: usize, scheduler: &Scheduler) {
}
fn stop() {
todo!()
}
}

2
src/notifications/mod.rs Normal file
View File

@ -0,0 +1,2 @@
mod types;
mod github_watcher;

View File

@ -0,0 +1,5 @@
use clokwerk::{Scheduler};
pub trait NotificationWatcherTask {
fn start(interval_ms: usize, scheduler: &Scheduler);
fn stop();
}