mirror of
https://github.com/matrix-org/matrix-hookshot.git
synced 2025-03-10 13:17:08 +00:00
Staggered RSS feed polling (#685)
* Ensure we poll feeds equally * Ensure we poll at least once * Create 685.misc * Tidy up
This commit is contained in:
parent
507b014bd5
commit
44eea7f7c3
1
changelog.d/685.misc
Normal file
1
changelog.d/685.misc
Normal file
@ -0,0 +1 @@
|
||||
Stagger RSS feed polling over the interval period, rather than attempting to poll all feeds at once. Should reduce memory / CPU spikes.
|
@ -6,11 +6,9 @@ import { FeedEntry, FeedError, FeedReader} from "../feeds/FeedReader";
|
||||
import { Logger } from "matrix-appservice-bridge";
|
||||
import { IBridgeStorageProvider } from "../Stores/StorageProvider";
|
||||
import { BaseConnection } from "./BaseConnection";
|
||||
import axios from "axios";
|
||||
import markdown from "markdown-it";
|
||||
import { Connection, ProvisionConnectionOpts } from "./IConnection";
|
||||
import { GetConnectionsResponseItem } from "../provisioning/api";
|
||||
import { StatusCodes } from "http-status-codes";
|
||||
const log = new Logger("FeedConnection");
|
||||
const md = new markdown();
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
import { MatrixClient } from "matrix-bot-sdk";
|
||||
import { MatrixClient, MatrixError } from "matrix-bot-sdk";
|
||||
import { BridgeConfigFeeds } from "../Config/Config";
|
||||
import { ConnectionManager } from "../ConnectionManager";
|
||||
import { FeedConnection } from "../Connections";
|
||||
@ -88,12 +88,29 @@ function normalizeUrl(input: string): string {
|
||||
return url.toString();
|
||||
}
|
||||
|
||||
function shuffle<T>(array: T[]): T[] {
|
||||
for (let i = array.length - 1; i > 0; i--) {
|
||||
const j = Math.floor(Math.random() * (i + 1));
|
||||
[array[i], array[j]] = [array[j], array[i]];
|
||||
}
|
||||
return array;
|
||||
}
|
||||
|
||||
interface FeedItem {
|
||||
title?: string;
|
||||
link?: string;
|
||||
id?: string;
|
||||
}
|
||||
|
||||
export class FeedReader {
|
||||
private readonly parser = FeedReader.buildParser();
|
||||
|
||||
private connections: FeedConnection[];
|
||||
// ts should notice that we do in fact initialize it in constructor, but it doesn't (in this version)
|
||||
private observedFeedUrls: Set<string> = new Set();
|
||||
|
||||
private feedQueue: string[] = [];
|
||||
|
||||
private seenEntries: Map<string, string[]> = new Map();
|
||||
// A set of last modified times for each url.
|
||||
private cacheTimes: Map<string, { etag?: string, lastModified?: string}> = new Map();
|
||||
@ -107,6 +124,10 @@ export class FeedReader {
|
||||
private shouldRun = true;
|
||||
private timeout?: NodeJS.Timeout;
|
||||
|
||||
get sleepingInterval() {
|
||||
return (this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length || 1);
|
||||
}
|
||||
|
||||
constructor(
|
||||
private readonly config: BridgeConfigFeeds,
|
||||
private readonly connectionManager: ConnectionManager,
|
||||
@ -152,20 +173,22 @@ export class FeedReader {
|
||||
}
|
||||
}
|
||||
this.observedFeedUrls = new Set(normalizedUrls);
|
||||
this.feedQueue = shuffle([...this.observedFeedUrls.values()]);
|
||||
|
||||
Metrics.feedsCount.set(this.observedFeedUrls.size);
|
||||
}
|
||||
|
||||
private async loadSeenEntries(): Promise<void> {
|
||||
try {
|
||||
const accountData = await this.matrixClient.getAccountData<any>(FeedReader.seenEntriesEventType).catch((err: any) => {
|
||||
if (err.statusCode === 404) {
|
||||
return {};
|
||||
const accountData = await this.matrixClient.getAccountData<AccountData>(FeedReader.seenEntriesEventType).catch((err: MatrixError|unknown) => {
|
||||
if (err instanceof MatrixError && err.statusCode === 404) {
|
||||
return {} as AccountData;
|
||||
} else {
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
if (!validateAccountData(accountData)) {
|
||||
const errors = validateAccountData.errors!.map(e => `${e.instancePath} ${e.message}`);
|
||||
const errors = validateAccountData.errors?.map(e => `${e.instancePath} ${e.message}`) || ['No error reported'];
|
||||
throw new Error(`Invalid account data: ${errors.join(', ')}`);
|
||||
}
|
||||
for (const url in accountData) {
|
||||
@ -191,10 +214,10 @@ export class FeedReader {
|
||||
|
||||
public static async fetchFeed(
|
||||
url: string,
|
||||
headers: any,
|
||||
headers: Record<string, string>,
|
||||
timeoutMs: number,
|
||||
parser: Parser = FeedReader.buildParser(),
|
||||
): Promise<{ response: AxiosResponse<any, any>, feed: Parser.Output<any> }> {
|
||||
): Promise<{ response: AxiosResponse, feed: Parser.Output<FeedItem> }> {
|
||||
const response = await axios.get(url, {
|
||||
headers: {
|
||||
'User-Agent': UserAgent,
|
||||
@ -207,133 +230,148 @@ export class FeedReader {
|
||||
return { response, feed };
|
||||
}
|
||||
|
||||
/**
|
||||
* Poll a given feed URL for data, pushing any entries found into the message queue.
|
||||
* We also check the `cacheTimes` cache to see if the feed has recent entries that we can
|
||||
* filter out.
|
||||
*
|
||||
* @param url The URL to be polled.
|
||||
* @returns A boolean that returns if we saw any changes on the feed since the last poll time.
|
||||
*/
|
||||
private async pollFeed(url: string): Promise<boolean> {
|
||||
let seenEntriesChanged = false;
|
||||
const fetchKey = randomUUID();
|
||||
const { etag, lastModified } = this.cacheTimes.get(url) || {};
|
||||
log.debug(`Checking for updates in ${url} (${etag ?? lastModified})`);
|
||||
try {
|
||||
const { response, feed } = await FeedReader.fetchFeed(
|
||||
url,
|
||||
{
|
||||
...(lastModified && { 'If-Modified-Since': lastModified}),
|
||||
...(etag && { 'If-None-Match': etag}),
|
||||
},
|
||||
// We don't want to wait forever for the feed.
|
||||
this.config.pollTimeoutSeconds * 1000,
|
||||
this.parser,
|
||||
);
|
||||
|
||||
// Store any entity tags/cache times.
|
||||
if (response.headers.ETag) {
|
||||
this.cacheTimes.set(url, { etag: response.headers.ETag});
|
||||
} else if (response.headers['Last-Modified']) {
|
||||
this.cacheTimes.set(url, { lastModified: response.headers['Last-Modified'] });
|
||||
}
|
||||
|
||||
let initialSync = false;
|
||||
let seenGuids = this.seenEntries.get(url);
|
||||
if (!seenGuids) {
|
||||
initialSync = true;
|
||||
seenGuids = [];
|
||||
seenEntriesChanged = true; // to ensure we only treat it as an initialSync once
|
||||
}
|
||||
|
||||
// migrate legacy, cleartext guids to their md5-hashed counterparts
|
||||
seenGuids = seenGuids.map(guid => guid.startsWith('md5:') ? guid : this.hashGuid(guid));
|
||||
|
||||
const seenGuidsSet = new Set(seenGuids);
|
||||
const newGuids = [];
|
||||
log.debug(`Found ${feed.items.length} entries in ${url}`);
|
||||
|
||||
for (const item of feed.items) {
|
||||
// Find the first guid-like that looks like a string.
|
||||
// Some feeds have a nasty habit of leading a empty tag there, making us parse it as garbage.
|
||||
const guid = [item.guid, item.id, item.link, item.title].find(id => typeof id === 'string' && id);
|
||||
if (!guid) {
|
||||
log.error(`Could not determine guid for entry in ${url}, skipping`);
|
||||
continue;
|
||||
}
|
||||
const hashedGuid = this.hashGuid(guid);
|
||||
newGuids.push(hashedGuid);
|
||||
|
||||
if (initialSync) {
|
||||
log.debug(`Skipping entry ${guid} since we're performing an initial sync`);
|
||||
continue;
|
||||
}
|
||||
if (seenGuidsSet.has(hashedGuid)) {
|
||||
log.debug('Skipping already seen entry', guid);
|
||||
continue;
|
||||
}
|
||||
|
||||
const entry = {
|
||||
feed: {
|
||||
title: feed.title ? stripHtml(feed.title) : null,
|
||||
url: url,
|
||||
},
|
||||
title: item.title ? stripHtml(item.title) : null,
|
||||
link: item.link || null,
|
||||
fetchKey
|
||||
};
|
||||
|
||||
log.debug('New entry:', entry);
|
||||
seenEntriesChanged = true;
|
||||
|
||||
this.queue.push<FeedEntry>({ eventName: 'feed.entry', sender: 'FeedReader', data: entry });
|
||||
}
|
||||
|
||||
if (seenEntriesChanged) {
|
||||
// Some RSS feeds can return a very small number of items then bounce
|
||||
// back to their "normal" size, so we cannot just clobber the recent GUID list per request or else we'll
|
||||
// forget what we sent and resend it. Instead, we'll keep 2x the max number of items that we've ever
|
||||
// seen from this feed, up to a max of 10,000.
|
||||
// Adopted from https://github.com/matrix-org/go-neb/blob/babb74fa729882d7265ff507b09080e732d060ae/services/rssbot/rssbot.go#L304
|
||||
const maxGuids = Math.min(Math.max(2 * newGuids.length, seenGuids.length), 10_000);
|
||||
const newSeenItems = Array.from(new Set([ ...newGuids, ...seenGuids ]).values()).slice(0, maxGuids);
|
||||
this.seenEntries.set(url, newSeenItems);
|
||||
}
|
||||
this.queue.push<FeedSuccess>({ eventName: 'feed.success', sender: 'FeedReader', data: { url: url } });
|
||||
// Clear any feed failures
|
||||
this.feedsFailingHttp.delete(url);
|
||||
this.feedsFailingParsing.delete(url);
|
||||
} catch (err: unknown) {
|
||||
if (axios.isAxiosError(err)) {
|
||||
// No new feed items, skip.
|
||||
if (err.response?.status === StatusCodes.NOT_MODIFIED) {
|
||||
return false;
|
||||
}
|
||||
this.feedsFailingHttp.add(url);
|
||||
} else {
|
||||
this.feedsFailingParsing.add(url);
|
||||
}
|
||||
const error = err instanceof Error ? err : new Error(`Unknown error ${err}`);
|
||||
const feedError = new FeedError(url.toString(), error, fetchKey);
|
||||
log.error("Unable to read feed:", feedError.message);
|
||||
this.queue.push<FeedError>({ eventName: 'feed.error', sender: 'FeedReader', data: feedError});
|
||||
} finally {
|
||||
this.feedQueue.push(url);
|
||||
}
|
||||
return seenEntriesChanged;
|
||||
}
|
||||
|
||||
private async pollFeeds(): Promise<void> {
|
||||
log.debug(`Checking for updates in ${this.observedFeedUrls.size} RSS/Atom feeds`);
|
||||
|
||||
let seenEntriesChanged = false;
|
||||
|
||||
const fetchingStarted = Date.now();
|
||||
|
||||
for (const url of this.observedFeedUrls.values()) {
|
||||
const fetchKey = randomUUID();
|
||||
const { etag, lastModified } = this.cacheTimes.get(url) || {};
|
||||
try {
|
||||
const { response, feed } = await FeedReader.fetchFeed(
|
||||
url,
|
||||
{
|
||||
...(lastModified && { 'If-Modified-Since': lastModified}),
|
||||
...(etag && { 'If-None-Match': etag}),
|
||||
},
|
||||
// We don't want to wait forever for the feed.
|
||||
this.config.pollTimeoutSeconds * 1000,
|
||||
this.parser,
|
||||
);
|
||||
|
||||
// Store any entity tags/cache times.
|
||||
if (response.headers.ETag) {
|
||||
this.cacheTimes.set(url, { etag: response.headers.ETag});
|
||||
} else if (response.headers['Last-Modified']) {
|
||||
this.cacheTimes.set(url, { lastModified: response.headers['Last-Modified'] });
|
||||
}
|
||||
const [ url ] = this.feedQueue.splice(0, 1);
|
||||
|
||||
let initialSync = false;
|
||||
let seenGuids = this.seenEntries.get(url);
|
||||
if (!seenGuids) {
|
||||
initialSync = true;
|
||||
seenGuids = [];
|
||||
seenEntriesChanged = true; // to ensure we only treat it as an initialSync once
|
||||
}
|
||||
|
||||
// migrate legacy, cleartext guids to their md5-hashed counterparts
|
||||
seenGuids = seenGuids.map(guid => guid.startsWith('md5:') ? guid : this.hashGuid(guid));
|
||||
|
||||
const seenGuidsSet = new Set(seenGuids);
|
||||
const newGuids = [];
|
||||
log.debug(`Found ${feed.items.length} entries in ${url}`);
|
||||
for (const item of feed.items) {
|
||||
// Find the first guid-like that looks like a string.
|
||||
// Some feeds have a nasty habit of leading a empty tag there, making us parse it as garbage.
|
||||
const guid = [item.guid, item.id, item.link, item.title].find(id => typeof id === 'string' && id);
|
||||
if (!guid) {
|
||||
log.error(`Could not determine guid for entry in ${url}, skipping`);
|
||||
continue;
|
||||
}
|
||||
const hashedGuid = this.hashGuid(guid);
|
||||
newGuids.push(hashedGuid);
|
||||
|
||||
if (initialSync) {
|
||||
log.debug(`Skipping entry ${guid} since we're performing an initial sync`);
|
||||
continue;
|
||||
}
|
||||
if (seenGuidsSet.has(hashedGuid)) {
|
||||
log.debug('Skipping already seen entry', guid);
|
||||
continue;
|
||||
}
|
||||
|
||||
const entry = {
|
||||
feed: {
|
||||
title: feed.title ? stripHtml(feed.title) : null,
|
||||
url: url,
|
||||
},
|
||||
title: item.title ? stripHtml(item.title) : null,
|
||||
link: item.link || null,
|
||||
fetchKey
|
||||
};
|
||||
|
||||
log.debug('New entry:', entry);
|
||||
seenEntriesChanged = true;
|
||||
|
||||
this.queue.push<FeedEntry>({ eventName: 'feed.entry', sender: 'FeedReader', data: entry });
|
||||
}
|
||||
|
||||
if (seenEntriesChanged) {
|
||||
// Some RSS feeds can return a very small number of items then bounce
|
||||
// back to their "normal" size, so we cannot just clobber the recent GUID list per request or else we'll
|
||||
// forget what we sent and resend it. Instead, we'll keep 2x the max number of items that we've ever
|
||||
// seen from this feed, up to a max of 10,000.
|
||||
// Adopted from https://github.com/matrix-org/go-neb/blob/babb74fa729882d7265ff507b09080e732d060ae/services/rssbot/rssbot.go#L304
|
||||
const maxGuids = Math.min(Math.max(2 * newGuids.length, seenGuids.length), 10_000);
|
||||
const newSeenItems = Array.from(new Set([ ...newGuids, ...seenGuids ]).values()).slice(0, maxGuids);
|
||||
this.seenEntries.set(url, newSeenItems);
|
||||
}
|
||||
this.queue.push<FeedSuccess>({ eventName: 'feed.success', sender: 'FeedReader', data: { url: url } });
|
||||
// Clear any feed failures
|
||||
this.feedsFailingHttp.delete(url);
|
||||
this.feedsFailingParsing.delete(url);
|
||||
} catch (err: unknown) {
|
||||
if (axios.isAxiosError(err)) {
|
||||
// No new feed items, skip.
|
||||
if (err.response?.status === StatusCodes.NOT_MODIFIED) {
|
||||
continue;
|
||||
}
|
||||
this.feedsFailingHttp.add(url);
|
||||
} else {
|
||||
this.feedsFailingParsing.add(url);
|
||||
}
|
||||
const error = err instanceof Error ? err : new Error(`Unknown error ${err}`);
|
||||
const feedError = new FeedError(url.toString(), error, fetchKey);
|
||||
log.error("Unable to read feed:", feedError.message);
|
||||
this.queue.push<FeedError>({ eventName: 'feed.error', sender: 'FeedReader', data: feedError});
|
||||
if (url) {
|
||||
if (await this.pollFeed(url)) {
|
||||
await this.saveSeenEntries();
|
||||
}
|
||||
}
|
||||
|
||||
Metrics.feedsFailing.set({ reason: "http" }, this.feedsFailingHttp.size );
|
||||
Metrics.feedsFailing.set({ reason: "parsing" }, this.feedsFailingParsing.size);
|
||||
|
||||
if (seenEntriesChanged) await this.saveSeenEntries();
|
||||
|
||||
const elapsed = Date.now() - fetchingStarted;
|
||||
Metrics.feedFetchMs.set(elapsed);
|
||||
|
||||
let sleepFor: number;
|
||||
if (elapsed > this.config.pollIntervalSeconds * 1000) {
|
||||
log.warn(`It took us longer to update the feeds than the configured pool interval (${elapsed / 1000}s)`);
|
||||
sleepFor = 0;
|
||||
} else {
|
||||
sleepFor = this.config.pollIntervalSeconds * 1000 - elapsed;
|
||||
log.debug(`Feed fetching took ${elapsed / 1000}s, sleeping for ${sleepFor / 1000}s`);
|
||||
}
|
||||
const sleepFor = Math.min(this.sleepingInterval - elapsed, 0);
|
||||
log.debug(`Feed fetching took ${elapsed / 1000}s, sleeping for ${sleepFor / 1000}s`);
|
||||
|
||||
if (elapsed > this.sleepingInterval) {
|
||||
log.warn(`It took us longer to update the feeds than the configured pool interval`);
|
||||
}
|
||||
|
||||
this.timeout = setTimeout(() => {
|
||||
if (!this.shouldRun) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user