diff --git a/src/UserNotificationWatcher.ts b/src/UserNotificationWatcher.ts index ebb06caf..d0b7e815 100644 --- a/src/UserNotificationWatcher.ts +++ b/src/UserNotificationWatcher.ts @@ -18,6 +18,7 @@ export interface UserNotificationsEvent { } export interface UserNotification { + id: string; reason: "assign"|"author"|"comment"|"invitation"|"manual"|"mention"|"review_required"|"security_alert"|"state_change"|"subscribed"|"team_mention"; unread: boolean; updated_at: number; @@ -79,17 +80,26 @@ export class UserNotificationWatcher { const since = stream.lastReadTs !== 0 ? `&since=${new Date(stream.lastReadTs).toISOString()}`: ""; const response = await stream.octoKit.request(`/notifications?participating=true${since}`); stream.lastReadTs = Date.now(); - const events: UserNotification[] = await Promise.all(response.data.map(async (event: UserNotification) => { - if (event.subject.url) { - const res = await stream.octoKit.request(event.subject.url); - event.subject.url_data = res.data; + const events: UserNotification[] = []; + for (const rawEvent of response.data as UserNotification[]) { + try { + await (async () => { + if (rawEvent.subject.url) { + const res = await stream.octoKit.request(rawEvent.subject.url); + rawEvent.subject.url_data = res.data; + } + if (rawEvent.subject.latest_comment_url) { + const res = await stream.octoKit.request(rawEvent.subject.latest_comment_url); + rawEvent.subject.latest_comment_url_data = res.data; + } + events.push(rawEvent); + })(); + } catch (ex) { + log.warn(`Failed to pre-process ${rawEvent.id}: ${ex}`); + // If it fails, we can just push the raw thing. + events.push(rawEvent); } - if (event.subject.latest_comment_url) { - const res = await stream.octoKit.request(event.subject.latest_comment_url); - event.subject.latest_comment_url_data = res.data; - } - return event; - })); + } this.queue.push({ eventName: "notifications.user.events", data: { @@ -104,28 +114,33 @@ export class UserNotificationWatcher { } this.userQueue.push(userId); } - } - removeUser(userId: string) { + public removeUser(userId: string) { this.userStreams.delete(userId); log.info(`Removed ${userId} to notif queue`); } - addUser(data: NotificationsEnableEvent) { + public addUser(data: NotificationsEnableEvent) { const clientKit = new Octokit({ authStrategy: createTokenAuth, auth: data.token, userAgent: "matrix-github v0.0.1", }); + const existing = this.userStreams.has(data.user_id); + this.userStreams.set(data.user_id, { octoKit: clientKit, userId: data.user_id, roomId: data.room_id, lastReadTs: data.since, }); - this.userQueue.push(data.user_id); - log.info(`Added ${data.user_id} to notif queue`); + if (!existing) { + log.info(`Inserted ${data.user_id} into the notif queue`); + this.userQueue.push(data.user_id); + } else { + log.info(`Reinserted ${data.user_id} into the notif queue`); + } } }