mirror of
https://github.com/matrix-org/matrix-hookshot.git
synced 2025-03-10 21:19:13 +00:00
Handle github API errors
This commit is contained in:
parent
0d1127e139
commit
cc7ca46b33
@ -1,6 +1,7 @@
|
||||
import { NotificationsEnableEvent } from "./GithubWebhooks";
|
||||
import { Octokit } from "@octokit/rest";
|
||||
import { createTokenAuth } from "@octokit/auth-token";
|
||||
import { RequestError } from "@octokit/request-error";
|
||||
import { LogWrapper } from "./LogWrapper";
|
||||
import { MessageQueue } from "./MessageQueue/MessageQueue";
|
||||
import { MessageSenderClient } from "./MatrixSender";
|
||||
@ -22,7 +23,7 @@ export interface UserNotificationsEvent {
|
||||
|
||||
export interface UserNotification {
|
||||
id: string;
|
||||
reason: "assign"|"author"|"comment"|"invitation"|"manual"|"mention"|"review_required"|"review_requested"|
|
||||
reason: "assign"|"author"|"comment"|"invitation"|"manual"|"mention"|"review_requested"|
|
||||
"security_alert"|"state_change"|"subscribed"|"team_mention";
|
||||
unread: boolean;
|
||||
updated_at: number;
|
||||
@ -36,39 +37,71 @@ export interface UserNotification {
|
||||
// Probably.
|
||||
url_data?: Octokit.IssuesGetResponse;
|
||||
latest_comment_url_data?: Octokit.IssuesGetCommentResponse;
|
||||
requested_reviewers?: Octokit.PullsListReviewRequestsResponse;
|
||||
reviews?: Octokit.PullsListReviewsResponse;
|
||||
};
|
||||
repository: Octokit.ActivityGetThreadResponseRepository;
|
||||
}
|
||||
|
||||
const MIN_INTERVAL_MS = 15000;
|
||||
const FAILURE_THRESHOLD = 50;
|
||||
const GH_API_THRESHOLD = 50;
|
||||
const GH_API_RETRY_IN = 1000 * 60;
|
||||
|
||||
const log = new LogWrapper("UserNotificationWatcher");
|
||||
|
||||
export class UserNotificationWatcher {
|
||||
private userIntervals: Map<string, NodeJS.Timeout> = new Map();
|
||||
private matrixMessageSender: MessageSenderClient;
|
||||
private apiFailureCount: number = 0;
|
||||
private globalRetryIn: number = 0;
|
||||
|
||||
constructor(private queue: MessageQueue) {
|
||||
this.matrixMessageSender = new MessageSenderClient(queue);
|
||||
}
|
||||
|
||||
public checkGitHubStatus() {
|
||||
this.apiFailureCount = Math.min(this.apiFailureCount + 1, GH_API_THRESHOLD);
|
||||
if (this.apiFailureCount < GH_API_THRESHOLD) {
|
||||
log.warn(`API Failure count at ${this.apiFailureCount}`);
|
||||
return;
|
||||
}
|
||||
// The API is actively failing.
|
||||
if (this.globalRetryIn > 0) {
|
||||
this.globalRetryIn = Date.now() + GH_API_RETRY_IN;
|
||||
}
|
||||
log.warn(`API Failure limit reached, holding off new requests for ${GH_API_RETRY_IN / 1000}s`);
|
||||
}
|
||||
|
||||
public start() {
|
||||
// No-op
|
||||
}
|
||||
|
||||
public async fetchUserNotifications(stream: UserStream) {
|
||||
if (this.globalRetryIn !== 0 && this.globalRetryIn > Date.now()) {
|
||||
log.info(`Not getting notifications for ${stream.userId}, API is still down.`);
|
||||
return stream;
|
||||
}
|
||||
log.info(`Getting notifications for ${stream.userId} ${stream.lastReadTs}`);
|
||||
const since = stream.lastReadTs !== 0 ? `&since=${new Date(stream.lastReadTs).toISOString()}`: "";
|
||||
let response: Octokit.AnyResponse;
|
||||
try {
|
||||
const since = stream.lastReadTs !== 0 ? `&since=${new Date(stream.lastReadTs).toISOString()}`: "";
|
||||
const response = await stream.octoKit.request(
|
||||
response = await stream.octoKit.request(
|
||||
`/notifications?participating=${stream.participating}${since}`,
|
||||
);
|
||||
log.info(`Got ${response.data.length} notifications`);
|
||||
stream.lastReadTs = Date.now();
|
||||
const events: UserNotification[] = [];
|
||||
// We were succesful, clear any timeouts.
|
||||
this.globalRetryIn = 0;
|
||||
// To avoid a bouncing issue, gradually reduce the failure count.
|
||||
this.apiFailureCount = Math.max(0, this.apiFailureCount - 2);
|
||||
} catch (ex) {
|
||||
await this.handleGitHubFailure(stream, ex);
|
||||
return stream;
|
||||
}
|
||||
log.info(`Got ${response.data.length} notifications`);
|
||||
stream.lastReadTs = Date.now();
|
||||
const events: UserNotification[] = [];
|
||||
|
||||
for (const rawEvent of response.data as UserNotification[]) {
|
||||
for (const rawEvent of response.data as UserNotification[]) {
|
||||
try {
|
||||
await (async () => {
|
||||
if (rawEvent.subject.url) {
|
||||
@ -79,6 +112,18 @@ export class UserNotificationWatcher {
|
||||
const res = await stream.octoKit.request(rawEvent.subject.latest_comment_url);
|
||||
rawEvent.subject.latest_comment_url_data = res.data;
|
||||
}
|
||||
if (rawEvent.reason === "review_requested") {
|
||||
rawEvent.subject.requested_reviewers = (await stream.octoKit.pulls.listReviewRequests({
|
||||
pull_number: rawEvent.subject.url_data?.number!,
|
||||
owner: rawEvent.repository.owner.login,
|
||||
repo: rawEvent.repository.name,
|
||||
})).data;
|
||||
rawEvent.subject.reviews = (await stream.octoKit.pulls.listReviews({
|
||||
pull_number: rawEvent.subject.url_data?.number!,
|
||||
owner: rawEvent.repository.owner.login,
|
||||
repo: rawEvent.repository.name,
|
||||
})).data;
|
||||
}
|
||||
events.push(rawEvent);
|
||||
})();
|
||||
} catch (ex) {
|
||||
@ -88,32 +133,38 @@ export class UserNotificationWatcher {
|
||||
}
|
||||
}
|
||||
|
||||
if (events.length > 0) {
|
||||
await this.queue.push<UserNotificationsEvent>({
|
||||
eventName: "notifications.user.events",
|
||||
data: {
|
||||
roomId: stream.roomId,
|
||||
events,
|
||||
lastReadTs: stream.lastReadTs,
|
||||
},
|
||||
sender: "GithubWebhooks",
|
||||
});
|
||||
}
|
||||
if (events.length > 0) {
|
||||
await this.queue.push<UserNotificationsEvent>({
|
||||
eventName: "notifications.user.events",
|
||||
data: {
|
||||
roomId: stream.roomId,
|
||||
events,
|
||||
lastReadTs: stream.lastReadTs,
|
||||
},
|
||||
sender: "GithubWebhooks",
|
||||
});
|
||||
}
|
||||
return stream;
|
||||
}
|
||||
|
||||
} catch (ex) {
|
||||
public handleGitHubFailure(stream: UserStream, ex: RequestError) {
|
||||
log.error("An error occured getting notifications:", ex);
|
||||
if (ex.status === 401 || ex.status === 404) {
|
||||
log.warn(`Got status ${ex.status} when handing user stream: ${ex.message}`);
|
||||
stream.failureCount++;
|
||||
log.error("An error occured getting notifications:", ex);
|
||||
} else if (ex.status >= 500) {
|
||||
setImmediate(() => this.checkGitHubStatus());
|
||||
}
|
||||
|
||||
if (stream.failureCount > FAILURE_THRESHOLD) {
|
||||
this.removeUser(stream.userId);
|
||||
await this.matrixMessageSender.sendMatrixText(
|
||||
return this.matrixMessageSender.sendMatrixText(
|
||||
stream.roomId,
|
||||
`The bridge has been unable to process your notification stream for some time, and has disabled notifications.
|
||||
Check your GitHub token is still valid, and then turn notifications back on.`, "m.notice",
|
||||
);
|
||||
}
|
||||
return stream;
|
||||
return null;
|
||||
}
|
||||
|
||||
public removeUser(userId: string) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user