mirror of
https://github.com/matrix-org/matrix-hookshot.git
synced 2025-03-10 21:19:13 +00:00
Support removing feed connections, deduplicate feed URLs
This commit is contained in:
parent
c2f97825f7
commit
a62235e925
@ -426,6 +426,7 @@ export class ConnectionManager extends EventEmitter {
|
|||||||
}
|
}
|
||||||
this.connections.splice(connectionIndex, 1);
|
this.connections.splice(connectionIndex, 1);
|
||||||
Metrics.connections.set(this.connections.length);
|
Metrics.connections.set(this.connections.length);
|
||||||
|
this.emit('connection-removed', connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -3,7 +3,6 @@ import { IConnection, IConnectionState } from ".";
|
|||||||
import { BridgeConfigFeeds } from "../Config/Config";
|
import { BridgeConfigFeeds } from "../Config/Config";
|
||||||
import { FeedEntry, FeedError} from "../feeds/FeedReader";
|
import { FeedEntry, FeedError} from "../feeds/FeedReader";
|
||||||
import LogWrapper from "../LogWrapper";
|
import LogWrapper from "../LogWrapper";
|
||||||
import { GetConnectionsResponseItem } from "../provisioning/api";
|
|
||||||
import { IBridgeStorageProvider } from "../Stores/StorageProvider";
|
import { IBridgeStorageProvider } from "../Stores/StorageProvider";
|
||||||
import { BaseConnection } from "./BaseConnection";
|
import { BaseConnection } from "./BaseConnection";
|
||||||
import markdown from "markdown-it";
|
import markdown from "markdown-it";
|
||||||
@ -33,11 +32,11 @@ export class FeedConnection extends BaseConnection implements IConnection {
|
|||||||
private readonly storage: IBridgeStorageProvider
|
private readonly storage: IBridgeStorageProvider
|
||||||
) {
|
) {
|
||||||
super(roomId, stateKey, FeedConnection.CanonicalEventType)
|
super(roomId, stateKey, FeedConnection.CanonicalEventType)
|
||||||
log.info(`FeedConnection created for ${roomId}, ${JSON.stringify(state)}`);
|
log.info(`Connection ${this.connectionId} created for ${roomId}, ${JSON.stringify(state)}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
public isInterestedInStateEvent(eventType: string, stateKey: string): boolean {
|
public isInterestedInStateEvent(eventType: string, stateKey: string): boolean {
|
||||||
return false;
|
return !!FeedConnection.EventTypes.find(e => e === eventType) && stateKey === this.feedUrl;
|
||||||
}
|
}
|
||||||
|
|
||||||
public async handleFeedEntry(entry: FeedEntry): Promise<void> {
|
public async handleFeedEntry(entry: FeedEntry): Promise<void> {
|
||||||
@ -62,6 +61,11 @@ export class FeedConnection extends BaseConnection implements IConnection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// needed to ensure that the connection is removable
|
||||||
|
public async onRemove(): Promise<void> {
|
||||||
|
log.info(`Removing connection ${this.connectionId}`);
|
||||||
|
}
|
||||||
|
|
||||||
toString(): string {
|
toString(): string {
|
||||||
return `FeedConnection ${this.state.url}`;
|
return `FeedConnection ${this.state.url}`;
|
||||||
}
|
}
|
||||||
|
@ -50,8 +50,16 @@ function stripHtml(input: string): string {
|
|||||||
return input.replace(/<[^>]*?>/g, '');
|
return input.replace(/<[^>]*?>/g, '');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function normalizeUrl(input: string): string {
|
||||||
|
const url = new URL(input);
|
||||||
|
url.hash = '';
|
||||||
|
return url.toString();
|
||||||
|
}
|
||||||
|
|
||||||
export class FeedReader {
|
export class FeedReader {
|
||||||
private observedFeedUrls: string[];
|
private connections: FeedConnection[];
|
||||||
|
// ts should notice that we do in fact initialize it in constructor, but it doesn't (in this version)
|
||||||
|
private observedFeedUrls: Set<string> = new Set();
|
||||||
private seenEntries: Map<string, string[]> = new Map();
|
private seenEntries: Map<string, string[]> = new Map();
|
||||||
static readonly seenEntriesEventType = "uk.half-shot.matrix-hookshot.feed.reader.seenEntries";
|
static readonly seenEntriesEventType = "uk.half-shot.matrix-hookshot.feed.reader.seenEntries";
|
||||||
|
|
||||||
@ -61,12 +69,21 @@ export class FeedReader {
|
|||||||
private queue: MessageQueue,
|
private queue: MessageQueue,
|
||||||
private matrixClient: MatrixClient,
|
private matrixClient: MatrixClient,
|
||||||
) {
|
) {
|
||||||
const feedConnections = this.connectionManager.getAllConnectionsOfType(FeedConnection);
|
this.connections = this.connectionManager.getAllConnectionsOfType(FeedConnection);
|
||||||
this.observedFeedUrls = feedConnections.map(c => c.feedUrl);
|
this.calculateFeedUrls();
|
||||||
connectionManager.on('new-connection', c => {
|
connectionManager.on('new-connection', c => {
|
||||||
if (c instanceof FeedConnection) {
|
if (c instanceof FeedConnection) {
|
||||||
log.info('New connection tracked:', c.feedUrl);
|
log.info('New connection tracked:', c.connectionId);
|
||||||
this.observedFeedUrls.push(c.feedUrl);
|
this.connections.push(c);
|
||||||
|
this.calculateFeedUrls();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
connectionManager.on('connection-removed', removed => {
|
||||||
|
if (removed instanceof FeedConnection) {
|
||||||
|
log.info('Connections before removal:', this.connections.map(c => c.connectionId));
|
||||||
|
this.connections = this.connections.filter(c => c.connectionId !== removed.connectionId);
|
||||||
|
log.info('Connections after removal:', this.connections.map(c => c.connectionId));
|
||||||
|
this.calculateFeedUrls();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -77,6 +94,19 @@ export class FeedReader {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private calculateFeedUrls(): void {
|
||||||
|
// just in case we got an invalid URL somehow
|
||||||
|
const normalizedUrls = [];
|
||||||
|
for (const conn of this.connections) {
|
||||||
|
try {
|
||||||
|
normalizedUrls.push(normalizeUrl(conn.feedUrl));
|
||||||
|
} catch (err: unknown) {
|
||||||
|
log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.observedFeedUrls = new Set(normalizedUrls);
|
||||||
|
}
|
||||||
|
|
||||||
private async loadSeenEntries(): Promise<void> {
|
private async loadSeenEntries(): Promise<void> {
|
||||||
try {
|
try {
|
||||||
const accountData = await this.matrixClient.getAccountData<any>(FeedReader.seenEntriesEventType).catch((err: any) => {
|
const accountData = await this.matrixClient.getAccountData<any>(FeedReader.seenEntriesEventType).catch((err: any) => {
|
||||||
@ -108,13 +138,13 @@ export class FeedReader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private async pollFeeds(): Promise<void> {
|
private async pollFeeds(): Promise<void> {
|
||||||
log.debug(`Checking for updates in ${this.observedFeedUrls.length} RSS/Atom feeds`);
|
log.debug(`Checking for updates in ${this.observedFeedUrls.size} RSS/Atom feeds`);
|
||||||
|
|
||||||
let seenEntriesChanged = false;
|
let seenEntriesChanged = false;
|
||||||
|
|
||||||
const fetchingStarted = (new Date()).getTime();
|
const fetchingStarted = (new Date()).getTime();
|
||||||
|
|
||||||
for (const url of this.observedFeedUrls) {
|
for (const url of this.observedFeedUrls.values()) {
|
||||||
try {
|
try {
|
||||||
const res = await axios.get(url.toString());
|
const res = await axios.get(url.toString());
|
||||||
const feed = await (new Parser()).parseString(res.data);
|
const feed = await (new Parser()).parseString(res.data);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user