Use 'Connections' to define bridges now

This commit is contained in:
Will Hunt 2020-07-19 18:58:07 +01:00
parent 5a1a1606bd
commit a42524404a
8 changed files with 597 additions and 408 deletions

View File

@ -1,15 +0,0 @@
import { MatrixEvent } from "./MatrixEvent";
export const BRIDGE_STATE_TYPE = "uk.half-shot.matrix-github.bridge";
interface BridgeRoomStateContent {
org: string;
repo: string;
state: string;
issues: string[];
comments_processed: number;
}
export interface IBridgeRoomState extends MatrixEvent<BridgeRoomStateContent> {
state_key: string;
}

View File

@ -26,8 +26,27 @@ interface IMatrixCommentEvent {
}
export class CommentProcessor {
private processedComments = new Set<string>();
private processedEvents = new Set<string>();
constructor(private as: Appservice, private mediaUrl: string) {}
public hasCommentBeenProcessed(org: string, repo: string, issue: string, id: number) {
return this.processedComments.has(`${org}/${repo}#${issue}~${id}`.toLowerCase());
}
public markCommentAsProcessed(org: string, repo: string, issue: string, id: number) {
this.processedComments.add(`${org}/${repo}#${issue}~${id}`.toLowerCase());
}
public hasEventBeenProcessed(roomId: string, eventId: string) {
return this.processedEvents.has(`${roomId}/${eventId}`);
}
public markEventAsProcessed(roomId: string, eventId: string) {
this.processedEvents.add(`${roomId}/${eventId}`);
}
public async getCommentBodyForEvent(event: MatrixEvent<MatrixMessageContent>, asBot: boolean): Promise<string> {
let body = event.content.body;
body = await this.replaceImages(body, false);

View File

@ -0,0 +1,291 @@
import { IConnection } from "./IConnection";
import { Appservice } from "matrix-bot-sdk";
import { MatrixMessageContent, MatrixEvent } from "../MatrixEvent";
import markdown from "markdown-it";
import { UserTokenStore } from "../UserTokenStore";
import LogWrapper from "../LogWrapper";
import { CommentProcessor } from "../CommentProcessor";
import { Octokit } from "@octokit/rest";
import { MessageSenderClient } from "../MatrixSender";
import { getIntentForUser } from "../IntentUtils";
import { FormatUtil } from "../FormatUtil";
import { IWebhookEvent } from "../GithubWebhooks";
export interface GitHubIssueConnectionState {
org: string;
repo: string;
state: string;
issues: string[];
comments_processed: number;
}
const log = new LogWrapper("GitHubIssueConnection");
const md = new markdown();
interface IQueryRoomOpts {
as: Appservice;
tokenStore: UserTokenStore;
commentProcessor: CommentProcessor;
messageClient: MessageSenderClient;
octokit: Octokit;
}
/**
* Handles rooms connected to a github repo.
*/
export class GitHubIssueConnection implements IConnection {
static readonly CanonicalEventType = "uk.half-shot.matrix-github.bridge";
static readonly EventTypes = [
GitHubIssueConnection.CanonicalEventType, // Legacy event, with an awful name.
];
static readonly QueryRoomRegex = /#github_(.+)_(.+)_(\d+):.*/;
static async onQueryRoom(result: RegExpExecArray, opts: IQueryRoomOpts): Promise<any> {
const parts = result!.slice(1);
const owner = parts[0];
const repo = parts[1];
const issueNumber = parseInt(parts[2], 10);
log.info(`Fetching ${owner}/${repo}/${issueNumber}`);
let issue: Octokit.IssuesGetResponse;
try {
issue = (await opts.octokit.issues.get({
owner,
repo,
issue_number: issueNumber,
})).data;
} catch (ex) {
log.error("Failed to get issue:", ex);
throw Error("Could not find issue");
}
// URL hack so we don't need to fetch the repo itself.
const orgRepoName = issue.repository_url.substr("https://api.github.com/repos/".length);
let avatarUrl = undefined;
try {
const profile = await opts.octokit.users.getByUsername({
username: owner,
});
if (profile.data.avatar_url) {
const buffer = await opts.octokit.request(profile.data.avatar_url);
log.info(`uploading ${profile.data.avatar_url}`);
// This does exist, but headers is silly and doesn't have content-type.
// tslint:disable-next-line: no-any
const contentType = (buffer.headers as any)["content-type"];
avatarUrl = {
type: "m.room.avatar",
state_key: "",
content: {
url: await opts.as.botClient.uploadContent(
Buffer.from(buffer.data as ArrayBuffer),
contentType,
),
},
};
}
} catch (ex) {
log.info("Failed to get avatar for org:", ex);
}
return {
visibility: "public",
name: FormatUtil.formatRoomName(issue),
topic: FormatUtil.formatRoomTopic(issue),
preset: "public_chat",
initial_state: [
{
type: this.CanonicalEventType,
content: {
org: orgRepoName.split("/")[0],
repo: orgRepoName.split("/")[1],
issues: [String(issue.number)],
comments_processed: -1,
state: "open",
} as GitHubIssueConnectionState,
state_key: issue.url,
},
avatarUrl,
],
};
}
constructor(public readonly roomId: string,
private readonly as: Appservice,
private state: GitHubIssueConnectionState,
private readonly stateKey: string,
private tokenStore: UserTokenStore,
private commentProcessor: CommentProcessor,
private messageClient: MessageSenderClient,
private octokit: Octokit) { }
public isInterestedInStateEvent(eventType: string, stateKey: string) {
return GitHubIssueConnection.EventTypes.includes(eventType) && this.stateKey === stateKey;
}
public get issueNumber() {
return parseInt(this.state.issues[0], 10);
}
public get org() {
return this.state.org;
}
public get repo() {
return this.state.repo;
}
public async onCommentCreated(event: IWebhookEvent, updateState = true) {
const comment = event.comment!;
if (event.repository) {
// Delay to stop comments racing sends
await new Promise((resolve) => setTimeout(resolve, 500));
if (this.commentProcessor.hasCommentBeenProcessed(this.state.org, this.state.repo, this.state.issues[0], comment.id)) {
return;
}
}
const commentIntent = await getIntentForUser(comment.user, this.as, this.octokit);
const matrixEvent = await this.commentProcessor.getEventBodyForComment(comment, event.repository, event.issue);
await this.messageClient.sendMatrixMessage(this.roomId, matrixEvent, "m.room.message", commentIntent.userId);
if (!updateState) {
return;
}
this.state.comments_processed++;
await this.as.botIntent.underlyingClient.sendStateEvent(
this.roomId,
GitHubIssueConnection.CanonicalEventType,
this.stateKey,
this.state,
);
}
private async syncIssueState() {
log.debug("Syncing issue state for", this.roomId);
const issue = await this.octokit.issues.get({
owner: this.state.org,
repo: this.state.repo,
issue_number: this.issueNumber,
});
if (this.state.comments_processed === -1) {
// This has a side effect of creating a profile for the user.
const creator = await getIntentForUser(issue.data.user, this.as, this.octokit);
// We've not sent any messages into the room yet, let's do it!
if (issue.data.body) {
await this.messageClient.sendMatrixMessage(this.roomId, {
msgtype: "m.text",
external_url: issue.data.html_url,
body: `${issue.data.body} (${issue.data.updated_at})`,
format: "org.matrix.custom.html",
formatted_body: md.render(issue.data.body),
}, "m.room.message", creator.userId);
}
if (issue.data.pull_request) {
// Send a patch in
// ...was this intended as a request for code?
}
this.state.comments_processed = 0;
}
if (this.state.comments_processed !== issue.data.comments) {
const comments = (await this.octokit.issues.listComments({
owner: this.state.org,
repo: this.state.repo,
issue_number: this.issueNumber,
// TODO: Use since to get a subset
})).data.slice(this.state.comments_processed);
for (const comment of comments) {
await this.onCommentCreated({
comment,
action: "fake",
}, false);
this.state.comments_processed++;
}
}
if (this.state.state !== issue.data.state) {
if (issue.data.state === "closed") {
const closedUserId = this.as.getUserIdForSuffix(issue.data.closed_by.login);
await this.messageClient.sendMatrixMessage(this.roomId, {
msgtype: "m.notice",
body: `closed the ${issue.data.pull_request ? "pull request" : "issue"} at ${issue.data.closed_at}`,
external_url: issue.data.closed_by.html_url,
}, "m.room.message", closedUserId);
}
await this.as.botIntent.underlyingClient.sendStateEvent(this.roomId, "m.room.topic", "", {
topic: FormatUtil.formatRoomTopic(issue.data),
});
this.state.state = issue.data.state;
}
await this.as.botIntent.underlyingClient.sendStateEvent(
this.roomId,
GitHubIssueConnection.CanonicalEventType,
this.stateKey,
this.state,
);
}
public async onMatrixIssueComment(event: MatrixEvent<MatrixMessageContent>, allowEcho: boolean = false) {
const clientKit = await this.tokenStore.getOctokitForUser(event.sender);
if (clientKit === null) {
log.info("Ignoring comment, user is not authenticated");
return;
}
const result = await clientKit.issues.createComment({
repo: this.state.repo,
owner: this.state.org,
body: await this.commentProcessor.getCommentBodyForEvent(event, false),
issue_number: parseInt(this.state.issues[0], 10),
});
if (!allowEcho) {
this.commentProcessor.markCommentAsProcessed(this.state.org, this.state.repo, this.state.issues[0], result.data.id);
}
}
public async onIssueEdited(event: IWebhookEvent) {
if (!event.changes) {
log.debug("No changes given");
return; // No changes made.
}
if (event.changes.title) {
await this.as.botIntent.underlyingClient.sendStateEvent(this.roomId, "m.room.name", "", {
name: FormatUtil.formatRoomName(event.issue!),
});
}
}
public onIssueStateChange(event: IWebhookEvent) {
return this.syncIssueState();
}
public async onEvent() {
}
public async onStateUpdate() {
}
public async onMessageEvent(ev: MatrixEvent<MatrixMessageContent>) {
if (ev.content.body === '!sync') {
// Sync data.
return this.syncIssueState();
}
await this.onMatrixIssueComment(ev);
}
public toString() {
return `GitHubIssue ${this.state.org}/${this.state.repo}#${this.state.issues.join(",")}`;
}
}

View File

@ -0,0 +1,30 @@
import { IConnection } from "./IConnection";
import { Appservice } from "matrix-bot-sdk";
export interface GitHubRepoConnectionState {}
/**
* Handles rooms connected to a github repo.
*/
export class GitHubRepoConnection implements IConnection {
static readonly EventTypes = ["uk.half-shot.matrix-github.github.repo"];
constructor(public readonly roomId: string, as: Appservice, state: GitHubRepoConnectionState) {
}
public isInterestedInStateEvent(eventType: string) {
return false;
}
public async onEvent() {
}
public async onStateUpdate() {
}
public toString() {
return `GitHubRepo`;
}
}

View File

@ -0,0 +1,32 @@
import { MatrixEvent, MatrixMessageContent } from "../MatrixEvent";
import { IWebhookEvent } from "../GithubWebhooks";
export interface IConnection {
roomId: string;
/**
* When a room gets an update to it's state.
*/
onStateUpdate: (ev: any) => Promise<void>;
/**
* When a room gets any event
*/
onEvent: (ev: MatrixEvent<unknown>) => Promise<void>;
/**
* When a room gets a message event
*/
onMessageEvent?: (ev: MatrixEvent<MatrixMessageContent>) => Promise<void>;
/**
* When a comment is created on a repo
*/
onCommentCreated?: (ev: IWebhookEvent) => Promise<void>;
onIssueStateChange?: (ev: IWebhookEvent) => Promise<void>;
onIssueEdited? :(event: IWebhookEvent) => Promise<void>;
isInterestedInStateEvent: (eventType: string, stateKey: string) => boolean;
toString(): string;
}

View File

@ -1,9 +1,6 @@
import { Appservice, IAppserviceRegistration, RichRepliesPreprocessor, IRichReplyMetadata } from "matrix-bot-sdk";
import { Octokit } from "@octokit/rest";
import { createTokenAuth } from "@octokit/auth-token";
import { createAppAuth } from "@octokit/auth-app";
import markdown from "markdown-it";
import { IBridgeRoomState, BRIDGE_STATE_TYPE } from "./BridgeState";
import { BridgeConfig } from "./Config";
import { IWebhookEvent, IOAuthRequest, IOAuthTokens, NotificationsEnableEvent,
NotificationsDisableEvent } from "./GithubWebhooks";
@ -11,7 +8,6 @@ import { CommentProcessor } from "./CommentProcessor";
import { MessageQueue, createMessageQueue } from "./MessageQueue/MessageQueue";
import { AdminRoom, BRIDGE_ROOM_TYPE, AdminAccountData } from "./AdminRoom";
import { UserTokenStore } from "./UserTokenStore";
import { FormatUtil } from "./FormatUtil";
import { MatrixEvent, MatrixMemberContent, MatrixMessageContent } from "./MatrixEvent";
import LogWrapper from "./LogWrapper";
import { MessageSenderClient } from "./MatrixSender";
@ -22,36 +18,61 @@ import { MemoryStorageProvider } from "./Stores/MemoryStorageProvider";
import { NotificationProcessor } from "./NotificationsProcessor";
import { IStorageProvider } from "./Stores/StorageProvider";
import { retry } from "./PromiseUtil";
import { IConnection } from "./Connections/IConnection";
import { GitHubRepoConnection } from "./Connections/GithubRepo";
import { GitHubIssueConnection } from "./Connections/GithubIssue";
const md = new markdown();
const log = new LogWrapper("GithubBridge");
export class GithubBridge {
private octokit!: Octokit;
private as!: Appservice;
private adminRooms: Map<string, AdminRoom>;
private roomIdtoBridgeState: Map<string, IBridgeRoomState[]>;
private orgRepoIssueToRoomId: Map<string, string>;
private matrixHandledEvents: Set<string>;
private adminRooms: Map<string, AdminRoom> = new Map();
private commentProcessor!: CommentProcessor;
private notifProcessor!: NotificationProcessor;
private queue!: MessageQueue;
private tokenStore!: UserTokenStore;
private messageClient!: MessageSenderClient;
constructor(private config: BridgeConfig, private registration: IAppserviceRegistration) {
this.roomIdtoBridgeState = new Map();
this.orgRepoIssueToRoomId = new Map();
this.matrixHandledEvents = new Set();
this.adminRooms = new Map();
private connections: IConnection[] = [];
constructor(private config: BridgeConfig, private registration: IAppserviceRegistration) { }
private async createConnectionForState(roomId: string, state: MatrixEvent<any>) {
if (state.content.disabled === false) {
log.debug(`${roomId} has disabled state for ${state.type}`);
return;
}
if (GitHubRepoConnection.EventTypes.includes(state.type)) {
return new GitHubRepoConnection(roomId, this.as, state);
}
if (GitHubIssueConnection.EventTypes.includes(state.type)) {
return new GitHubIssueConnection(roomId, this.as, state.content, state.state_key || "", this.tokenStore, this.commentProcessor, this.messageClient, this.octokit);
}
return;
}
private async createConnectionsForRoomId(roomId: string): Promise<IConnection[]> {
const state = await this.as.botClient.getRoomState(roomId);
return state.map((event) => this.createConnectionForState(roomId, event)).filter((connection) => !!connection) as unknown as IConnection[];
}
private getConnectionsForGithubIssue(org: string, repo: string, issueNumber: number) {
return this.connections.filter((c) => c instanceof GitHubIssueConnection && c.org === org && c.repo === repo && c.issueNumber === issueNumber);
}
public stop() {
this.as.stop();
this.queue.stop();
}
public async start() {
this.adminRooms = new Map();
log.info('Starting up');
this.queue = createMessageQueue(this.config);
this.messageClient = new MessageSenderClient(this.queue);
// TODO: Make this generic.
const auth = {
id: parseInt(this.config.github.auth.id as string, 10),
privateKey: await fs.readFile(this.config.github.auth.privateKeyFile, "utf-8"),
@ -74,11 +95,14 @@ export class GithubBridge {
let storage: IStorageProvider;
if (this.config.queue.host && this.config.queue.port) {
log.info(`Initialising Redis storage (on ${this.config.queue.host}:${this.config.queue.port})`);
storage = new RedisStorageProvider(this.config.queue.host, this.config.queue.port);
} else {
log.info('Initialising memory storage');
storage = new MemoryStorageProvider();
}
this.notifProcessor = new NotificationProcessor(storage, this.messageClient);
this.as = new Appservice({
@ -92,7 +116,7 @@ export class GithubBridge {
this.commentProcessor = new CommentProcessor(this.as, this.config.bridge.mediaUrl);
this.tokenStore = new UserTokenStore(this.config.github.passFile || "./passkey.pem", this.as.botIntent);
this.tokenStore = new UserTokenStore(this.config.passFile || "./passkey.pem", this.as.botIntent);
await this.tokenStore.load();
this.as.on("query.room", async (roomAlias, cb) => {
@ -122,19 +146,51 @@ export class GithubBridge {
this.queue.subscribe("notifications.user.events");
this.queue.on<IWebhookEvent>("comment.created", async (msg) => {
return this.onCommentCreated(msg.data);
const connections = this.getConnectionsForGithubIssue(msg.data.repository!.owner.login, msg.data.repository!.name, msg.data.issue!.number);
connections.map(async (c) => {
try {
if (c.onCommentCreated)
await c.onCommentCreated(msg.data);
} catch (ex) {
log.warn(`Connection ${c.toString()} failed to handle comment.created:`, ex);
}
})
});
this.queue.on<IWebhookEvent>("issue.edited", async (msg) => {
return this.onIssueEdited(msg.data);
const connections = this.getConnectionsForGithubIssue(msg.data.repository!.owner.login, msg.data.repository!.name, msg.data.issue!.number);
connections.map(async (c) => {
try {
if (c.onIssueEdited)
await c.onIssueEdited(msg.data);
} catch (ex) {
log.warn(`Connection ${c.toString()} failed to handle comment.created:`, ex);
}
})
});
this.queue.on<IWebhookEvent>("issue.closed", async (msg) => {
return this.onIssueStateChange(msg.data);
const connections = this.getConnectionsForGithubIssue(msg.data.repository!.owner.login, msg.data.repository!.name, msg.data.issue!.number);
connections.map(async (c) => {
try {
if (c.onIssueStateChange)
await c.onIssueStateChange(msg.data);
} catch (ex) {
log.warn(`Connection ${c.toString()} failed to handle comment.created:`, ex);
}
})
});
this.queue.on<IWebhookEvent>("issue.reopened", async (msg) => {
return this.onIssueStateChange(msg.data);
const connections = this.getConnectionsForGithubIssue(msg.data.repository!.owner.login, msg.data.repository!.name, msg.data.issue!.number);
connections.map(async (c) => {
try {
if (c.onIssueStateChange)
await c.onIssueStateChange(msg.data);
} catch (ex) {
log.warn(`Connection ${c.toString()} failed to handle comment.created:`, ex);
}
})
});
this.queue.on<UserNotificationsEvent>("notifications.user.events", async (msg) => {
@ -165,7 +221,7 @@ export class GithubBridge {
adminRoom.clearOauthState();
await this.tokenStore.storeUserToken("github", adminRoom.userId, msg.data.access_token);
});
// Fetch all room state
let joinedRooms: string[];
while(true) {
@ -173,6 +229,7 @@ export class GithubBridge {
log.info("Connecting to homeserver and fetching joined rooms..");
joinedRooms = await this.as.botIntent.underlyingClient.getJoinedRooms();
log.info(`Found ${joinedRooms.length} rooms`);
break;
} catch (ex) {
// This is our first interaction with the homeserver, so wait if it's not ready yet.
log.warn("Failed to connect to homeserver:", ex, "retrying in 5s");
@ -182,92 +239,70 @@ export class GithubBridge {
for (const roomId of joinedRooms) {
log.info("Fetching state for " + roomId);
try {
const accountData = await this.as.botIntent.underlyingClient.getRoomAccountData(
BRIDGE_ROOM_TYPE, roomId,
);
if (accountData.type === "admin") {
const adminRoom = new AdminRoom(
roomId, accountData, this.as.botIntent, this.tokenStore, this.config,
const connections = await this.createConnectionsForRoomId(roomId);
this.connections.push(...connections);
if (connections.length === 0) {
// TODO: Refactor this to be a connection
try {
const accountData = await this.as.botIntent.underlyingClient.getRoomAccountData(
BRIDGE_ROOM_TYPE, roomId,
);
adminRoom.on("settings.changed", this.onAdminRoomSettingsChanged.bind(this));
this.adminRooms.set(roomId, adminRoom);
log.info(`${roomId} is an admin room for ${adminRoom.userId}`);
// Call this on startup to set the state
await this.onAdminRoomSettingsChanged(adminRoom, adminRoom.data);
if (accountData.type === "admin") {
const adminRoom = new AdminRoom(
roomId, accountData, this.as.botIntent, this.tokenStore, this.config,
);
adminRoom.on("settings.changed", this.onAdminRoomSettingsChanged.bind(this));
this.adminRooms.set(roomId, adminRoom);
log.info(`${roomId} is an admin room for ${adminRoom.userId}`);
// Call this on startup to set the state
await this.onAdminRoomSettingsChanged(adminRoom, adminRoom.data);
}
} catch (ex) {
log.warn(`Room ${roomId} has no connections and is not an admin room`);
}
} catch (ex) { /* this is an old style room */ }
await this.getRoomBridgeState(roomId);
} else {
log.debug(`Room ${roomId} is connected to: ${connections.join(',')}`);
}
}
await this.as.begin();
log.info("Started bridge");
}
public stop() {
this.as.stop();
this.queue.stop();
}
private async getRoomBridgeState(roomId: string, existingState?: IBridgeRoomState) {
if (this.roomIdtoBridgeState.has(roomId) && !existingState) {
return this.roomIdtoBridgeState.get(roomId)!;
}
try {
log.info("Updating state cache for " + roomId);
const state = existingState ? [existingState] : (
await this.as.botIntent.underlyingClient.getRoomState(roomId)
);
const bridgeEvents: IBridgeRoomState[] = state.filter((e: IBridgeRoomState) =>
e.type === BRIDGE_STATE_TYPE,
);
this.roomIdtoBridgeState.set(roomId, bridgeEvents);
for (const event of bridgeEvents) {
this.orgRepoIssueToRoomId.set(
`${event.content.org}/${event.content.repo}#${event.content.issues[0]}`,
roomId,
);
}
return bridgeEvents;
} catch (ex) {
log.error(`Failed to get room state for ${roomId}:` + ex);
}
return [];
}
private async onRoomInvite(roomId: string, event: MatrixEvent<MatrixMemberContent>) {
if (this.as.isNamespacedUser(event.sender)) {
/* Do not handle invites from our users */
return;
}
log.info(`Got invite roomId=${roomId} from=${event.sender} to=${event.state_key}`);
// Room joins can fail over federation
await retry(() => this.as.botIntent.joinRoom(roomId), 5);
const members = await this.as.botIntent.underlyingClient.getJoinedRoomMembers(roomId);
if (members.filter((userId) => ![this.as.botUserId, event.sender].includes(userId)).length !== 0) {
await this.messageClient.sendMatrixText(
roomId,
"This bridge currently only supports invites to 1:1 rooms",
"m.notice",
);
await this.as.botIntent.underlyingClient.leaveRoom(roomId);
return;
if (event.state_key !== this.as.botUserId) {
return this.as.botIntent.underlyingClient.kickUser(this.as.botUserId, roomId, "Bridge does not support DMing ghosts");
}
await retry(() => this.as.botIntent.joinRoom(roomId), 5);
if (event.content.is_direct) {
const data = {admin_user: event.sender, type: "admin"};
await this.as.botIntent.underlyingClient.setRoomAccountData(
BRIDGE_ROOM_TYPE, roomId, data,
);
const adminRoom = new AdminRoom(roomId, data, this.as.botIntent, this.tokenStore, this.config);
adminRoom.on("settings.changed", this.onAdminRoomSettingsChanged.bind(this));
this.adminRooms.set(
roomId,
adminRoom,
);
} else {
// This is a group room, don't add the admin settings and just sit in the room.
}
const data = {admin_user: event.sender, type: "admin"};
await this.as.botIntent.underlyingClient.setRoomAccountData(
BRIDGE_ROOM_TYPE, roomId, data,
);
const adminRoom = new AdminRoom(roomId, data, this.as.botIntent, this.tokenStore, this.config);
adminRoom.on("settings.changed", this.onAdminRoomSettingsChanged.bind(this));
this.adminRooms.set(
roomId,
adminRoom,
);
}
private async onRoomMessage(roomId: string, event: MatrixEvent<MatrixMessageContent>) {
const isOurUser = this.as.isNamespacedUser(event.sender);
if (isOurUser) {
if (this.as.isNamespacedUser(event.sender)) {
/* We ignore messages from our users */
return;
}
log.info(`Got message roomId=${roomId} from=${event.sender}`);
log.debug(event);
if (this.adminRooms.has(roomId)) {
const room = this.adminRooms.get(roomId)!;
@ -288,11 +323,12 @@ export class GithubBridge {
const issueNumber = ev.content["uk.half-shot.matrix-github.issue"]?.number;
if (splitParts && issueNumber) {
log.info(`Handling reply for ${splitParts}${issueNumber}`);
await this.onMatrixIssueComment(processedReply, {
org: splitParts[0],
repo: splitParts[1],
issues: [issueNumber.toString()],
});
const connections = this.getConnectionsForGithubIssue(splitParts[0], splitParts[1], issueNumber);
await Promise.all(connections.map(async c => {
if (c instanceof GitHubIssueConnection) {
return c.onMatrixIssueComment(processedReply);
}
}));
} else {
log.info("Missing parts!:", splitParts, issueNumber);
}
@ -309,305 +345,56 @@ export class GithubBridge {
}
}
const bridgeState = await this.getRoomBridgeState(roomId);
if (bridgeState.length === 0) {
log.info("Room has no state for bridge");
return;
for (const connection of this.connections.filter((c) => c.roomId === roomId && c.onMessageEvent)) {
try {
await connection.onMessageEvent!(event);
} catch (ex) {
log.warn(`Connection ${connection.toString()} failed to handle message:`, ex);
}
}
if (bridgeState.length > 1) {
log.error("Can't handle multiple bridges yet");
return;
}
const githubRepo = bridgeState[0].content;
log.info(`Got new request for ${githubRepo.org}${githubRepo.repo}#${githubRepo.issues.join("|")}`);
const messageEvent = event as MatrixEvent<MatrixMessageContent>;
if (messageEvent.content.body === "!sync") {
await this.syncIssueState(roomId, bridgeState[0]);
}
await this.onMatrixIssueComment(messageEvent, bridgeState[0].content);
log.debug(event);
}
private async onRoomEvent(roomId: string, event: MatrixEvent<unknown>) {
if (event.type === BRIDGE_STATE_TYPE) {
const state = event as IBridgeRoomState;
log.info(`Got new state for ${roomId}`);
await this.getRoomBridgeState(roomId, state);
// Get current state of issue.
await this.syncIssueState(roomId, state);
return;
}
}
private async getIntentForUser(user: Octokit.IssuesGetResponseUser) {
const intent = this.as.getIntentForSuffix(user.login);
const displayName = `${user.login}`;
// Verify up-to-date profile
let profile;
await intent.ensureRegistered();
try {
profile = await intent.underlyingClient.getUserProfile(intent.userId);
if (profile.displayname !== displayName || (!profile.avatar_url && user.avatar_url)) {
log.info(`${intent.userId}'s profile is out of date`);
// Also set avatar
const buffer = await this.octokit.request(user.avatar_url);
log.info(`uploading ${user.avatar_url}`);
// This does exist, but headers is silly and doesn't have content-type.
// tslint:disable-next-line: no-any
const contentType = (buffer.headers as any)["content-type"];
const mxc = await intent.underlyingClient.uploadContent(
Buffer.from(buffer.data as ArrayBuffer),
contentType,
);
await intent.underlyingClient.setAvatarUrl(mxc);
await intent.underlyingClient.setDisplayName(displayName);
}
} catch (ex) {
profile = {};
}
return intent;
}
private async syncIssueState(roomId: string, repoState: IBridgeRoomState) {
log.debug("Syncing issue state for", roomId);
const issue = await this.octokit.issues.get({
owner: repoState.content.org,
repo: repoState.content.repo,
issue_number: parseInt(repoState.content.issues[0], 10),
});
if (repoState.content.comments_processed === -1) {
// This has a side effect of creating a profile for the user.
const creator = await this.getIntentForUser(issue.data.user);
// We've not sent any messages into the room yet, let's do it!
if (issue.data.body) {
await this.messageClient.sendMatrixMessage(roomId, {
msgtype: "m.text",
external_url: issue.data.html_url,
body: `${issue.data.body} (${issue.data.updated_at})`,
format: "org.matrix.custom.html",
formatted_body: md.render(issue.data.body),
}, "m.room.message", creator.userId);
}
if (issue.data.pull_request) {
// Send a patch in
}
repoState.content.comments_processed = 0;
}
if (repoState.content.comments_processed !== issue.data.comments) {
const comments = (await this.octokit.issues.listComments({
owner: repoState.content.org,
repo: repoState.content.repo,
issue_number: parseInt(repoState.content.issues[0], 10),
// TODO: Use since to get a subset
})).data.slice(repoState.content.comments_processed);
for (const comment of comments) {
await this.onCommentCreated({
comment,
action: "fake",
}, roomId, false);
repoState.content.comments_processed++;
if (event.state_key) {
// A state update, hurrah!
const existingConnection = this.connections.find((c) => c.roomId === roomId && c.isInterestedInStateEvent(event.type, event.state_key || ""));
if (existingConnection) {
existingConnection.onStateUpdate(event);
} else {
// Is anyone interested in this state?
const connection = await this.createConnectionForState(roomId, event);
if (connection) {
log.info(`Found new state for ${roomId}`);
this.connections.push(connection);
}
}
}
if (repoState.content.state !== issue.data.state) {
if (issue.data.state === "closed") {
const closedUserId = this.as.getUserIdForSuffix(issue.data.closed_by.login);
await this.messageClient.sendMatrixMessage(roomId, {
msgtype: "m.notice",
body: `closed the ${issue.data.pull_request ? "pull request" : "issue"} at ${issue.data.closed_at}`,
external_url: issue.data.closed_by.html_url,
}, "m.room.message", closedUserId);
}
await this.as.botIntent.underlyingClient.sendStateEvent(roomId, "m.room.topic", "", {
topic: FormatUtil.formatRoomTopic(issue.data),
});
repoState.content.state = issue.data.state;
}
await this.as.botIntent.underlyingClient.sendStateEvent(
roomId,
BRIDGE_STATE_TYPE,
repoState.state_key,
repoState.content,
);
// Alas, it's just an event.
return this.connections.filter((c) => c.roomId === roomId).map((c) => c.onEvent(event))
}
private async onQueryRoom(roomAlias: string) {
log.info("Got room query request:", roomAlias);
const match = /#github_(.+)_(.+)_(\d+):.*/.exec(roomAlias);
if (!match || match.length < 4) {
throw Error("Alias is in an incorrect format");
}
const parts = match!.slice(1);
const owner = parts[0];
const repo = parts[1];
const issueNumber = parseInt(parts[2], 10);
log.info(`Fetching ${owner}/${repo}/${issueNumber}`);
let issue: Octokit.IssuesGetResponse;
try {
issue = (await this.octokit.issues.get({
owner,
repo,
issue_number: issueNumber,
})).data;
} catch (ex) {
log.error("Failed to get issue:", ex);
throw Error("Could not find issue");
}
// URL hack so we don't need to fetch the repo itself.
const orgRepoName = issue.repository_url.substr("https://api.github.com/repos/".length);
let avatarUrl = undefined;
try {
const profile = await this.octokit.users.getByUsername({
username: owner,
});
if (profile.data.avatar_url) {
const buffer = await this.octokit.request(profile.data.avatar_url);
log.info(`uploading ${profile.data.avatar_url}`);
// This does exist, but headers is silly and doesn't have content-type.
// tslint:disable-next-line: no-any
const contentType = (buffer.headers as any)["content-type"];
avatarUrl = {
type: "m.room.avatar",
state_key: "",
content: {
url: await this.as.botClient.uploadContent(
Buffer.from(buffer.data as ArrayBuffer),
contentType,
),
},
};
// Determine which type of room it is.
let res: RegExpExecArray | null;
res = GitHubIssueConnection.QueryRoomRegex.exec(roomAlias);
if (res) {
try {
return await GitHubIssueConnection.onQueryRoom(res, {
as: this.as,
tokenStore: this.tokenStore,
messageClient: this.messageClient,
commentProcessor: this.commentProcessor,
octokit: this.octokit,
});
} catch (ex) {
log.error(`Could not handle alias with GitHubIssueConnection`, ex);
throw ex;
}
} catch (ex) {
log.info("Failed to get avatar for org:", ex);
} else {
throw Error('No regex matching query pattern');
}
return {
visibility: "public",
name: FormatUtil.formatRoomName(issue),
topic: FormatUtil.formatRoomTopic(issue),
preset: "public_chat",
initial_state: [
{
type: BRIDGE_STATE_TYPE,
content: {
org: orgRepoName.split("/")[0],
repo: orgRepoName.split("/")[1],
issues: [String(issue.number)],
comments_processed: -1,
state: "open",
},
state_key: issue.url,
} as IBridgeRoomState,
avatarUrl,
],
};
}
private async onCommentCreated(event: IWebhookEvent, roomId?: string, updateState: boolean = true) {
if (!roomId) {
const issueKey = `${event.repository!.owner.login}/${event.repository!.name}#${event.issue!.number}`;
roomId = this.orgRepoIssueToRoomId.get(issueKey);
if (!roomId) {
log.debug("No room id for repo");
return;
}
}
const comment = event.comment!;
if (event.repository) {
// Delay to stop comments racing sends
await new Promise((resolve) => setTimeout(resolve, 500));
const dupeKey =
`${event.repository.owner.login}/${event.repository.name}#${event.issue!.number}~${comment.id}`
.toLowerCase();
if (this.matrixHandledEvents.has(dupeKey)) {
return;
}
}
const commentIntent = await this.getIntentForUser(comment.user);
const matrixEvent = await this.commentProcessor.getEventBodyForComment(comment, event.repository, event.issue);
await this.messageClient.sendMatrixMessage(roomId, matrixEvent, "m.room.message", commentIntent.userId);
if (!updateState) {
return;
}
const state = (await this.getRoomBridgeState(roomId))[0];
state.content.comments_processed++;
await this.as.botIntent.underlyingClient.sendStateEvent(
roomId,
BRIDGE_STATE_TYPE,
state.state_key,
state.content,
);
}
private async onIssueEdited(event: IWebhookEvent) {
if (!event.changes) {
log.debug("No changes given");
return; // No changes made.
}
const issueKey = `${event.repository!.owner.login}/${event.repository!.name}#${event.issue!.number}`;
const roomId = this.orgRepoIssueToRoomId.get(issueKey)!;
const roomState = await this.getRoomBridgeState(roomId);
if (!roomId || !roomState) {
log.debug("No tracked room state");
return;
}
if (event.changes.title) {
await this.as.botIntent.underlyingClient.sendStateEvent(roomId, "m.room.name", "", {
name: FormatUtil.formatRoomName(event.issue!),
});
}
}
private async onIssueStateChange(event: IWebhookEvent) {
const issueKey = `${event.repository!.owner.login}/${event.repository!.name}#${event.issue!.number}`;
const roomId = this.orgRepoIssueToRoomId.get(issueKey)!;
const roomState = await this.getRoomBridgeState(roomId);
if (!roomId || !roomState || roomState.length === 0) {
log.debug("No tracked room state");
return;
}
log.debug(roomState);
await this.syncIssueState(roomId, roomState[0]);
}
private async onMatrixIssueComment(event: MatrixEvent<MatrixMessageContent>,
bridgeState: {repo: string, org: string, issues: string[]},
allowEcho: boolean = false) {
const clientKit = await this.getOctokitForUser(event.sender);
if (clientKit === null) {
log.info("Ignoring comment, user is not authenticated");
return;
}
const result = await clientKit.issues.createComment({
repo: bridgeState.repo,
owner: bridgeState.org,
body: await this.commentProcessor.getCommentBodyForEvent(event, false),
issue_number: parseInt(bridgeState.issues[0], 10),
});
if (allowEcho) {
return;
}
const key =
`${bridgeState.org}/${bridgeState.repo}#${bridgeState.issues[0]}~${result.data.id}`
.toLowerCase();
this.matrixHandledEvents.add(key);
}
private async onAdminRoomSettingsChanged(adminRoom: AdminRoom, settings: AdminAccountData) {
@ -641,16 +428,4 @@ export class GithubBridge {
});
}
}
private async getOctokitForUser(userId: string) {
const senderToken = await this.tokenStore.getUserToken("github", userId);
if (!senderToken) {
return null;
}
return new Octokit({
authStrategy: createTokenAuth,
auth: senderToken,
userAgent: "matrix-github v0.0.1",
});
}
}

41
src/IntentUtils.ts Normal file
View File

@ -0,0 +1,41 @@
import LogWrapper from "./LogWrapper";
import { Octokit } from "@octokit/rest";
import { Appservice } from "matrix-bot-sdk";
const log = new LogWrapper("IntentUtils");
export async function getIntentForUser(user: Octokit.IssuesGetResponseUser, as: Appservice, octokit: Octokit) {
const intent = as.getIntentForSuffix(user.login);
const displayName = `${user.login}`;
// Verify up-to-date profile
let profile;
await intent.ensureRegistered();
try {
profile = await intent.underlyingClient.getUserProfile(intent.userId);
} catch (ex) {
profile = {};
}
if (profile.displayname !== displayName) {
log.debug(`Updating ${intent.userId}'s displayname`);
log.info(`${intent.userId}'s profile is out of date`);
await intent.underlyingClient.setDisplayName(displayName);
}
if (!profile.avatar_url && user.avatar_url) {
log.debug(`Updating ${intent.userId}'s avatar`);
const buffer = await octokit.request(user.avatar_url);
log.info(`uploading ${user.avatar_url}`);
// This does exist, but headers is silly and doesn't have content-type.
// tslint:disable-next-line: no-any
const contentType = (buffer.headers as any)["content-type"];
const mxc = await intent.underlyingClient.uploadContent(
Buffer.from(buffer.data as ArrayBuffer),
contentType,
);
await intent.underlyingClient.setAvatarUrl(mxc);
}
return intent;
}

View File

@ -2,6 +2,8 @@ import { Intent } from "matrix-bot-sdk";
import { promises as fs } from "fs";
import { publicEncrypt, privateDecrypt } from "crypto";
import LogWrapper from "./LogWrapper";
import { Octokit } from "@octokit/rest";
import { createTokenAuth } from "@octokit/auth-token";
const ACCOUNT_DATA_TYPE = "uk.half-shot.matrix-github.password-store:";
const ACCOUNT_DATA_GITLAB_TYPE = "uk.half-shot.matrix-github.gitlab.password-store:";
@ -15,6 +17,7 @@ export class UserTokenStore {
}
public async load() {
log.info(`Loading token key file ${this.keyPath}`);
this.key = await fs.readFile(this.keyPath);
}
@ -48,4 +51,17 @@ export class UserTokenStore {
}
return null;
}
public async getOctokitForUser(userId: string) {
// TODO: Move this somewhere else.
const senderToken = await this.getUserToken("github", userId);
if (!senderToken) {
return null;
}
return new Octokit({
authStrategy: createTokenAuth,
auth: senderToken,
userAgent: "matrix-github v0.0.1",
});
}
}