Backoff failing RSS feeds (#890)

* Backoff RSS requests if a url repeatedly fails.

* Increase max backoff time to a day

* Add backoff for failing feeds.

* Remove unused finally

* Add this.feedLastBackoff

* Rewrite in rust.

* linting

* pop

* Optimise backoff function further

* Drop only!

* fix test

* lint

* lint further

* Better comments

* Fix urls calculation

* Remove testing URL

* Add some variance to speed up while loop

* correct comment

* Follow the advice and use a VecDeque as it's slightly faster.

* Vastly better shuffle method

* Speed up checking for previous guids.

* fix hasher function

* lint

* Content doesn't need to be calculated twice.

* Slightly more efficient iteration

* Improve performance of backoff insertion

* Configure feed reader

* lint

* Ensure appending and removing from the queue works as expected.

* Ensure we do keep urls that have been removed.

* lint

* Inc/dec metrics as queue items are added/deleted.

* Add comment

* tidy up
This commit is contained in:
Will Hunt 2024-02-20 22:21:19 +00:00 committed by GitHub
parent 3ff87b7564
commit 387f7c1ce9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 263 additions and 67 deletions

1
Cargo.lock generated
View File

@ -681,6 +681,7 @@ dependencies = [
"napi",
"napi-build",
"napi-derive",
"rand",
"reqwest",
"rgb",
"rss",

View File

@ -21,6 +21,7 @@ rss = "2.0"
atom_syndication = "0.12"
ruma = { version = "0.9", features = ["events", "html"] }
reqwest = "0.11"
rand = "0.8.5"
[build-dependencies]
napi-build = "2"

1
changelog.d/890.misc Normal file
View File

@ -0,0 +1 @@
Failing RSS/atom feeds are now backed off before being retried. This should result in a speedup for large public deployments where failing feeds may result in a slowdown.

View File

@ -58,7 +58,7 @@ describe('GitHub', () => {
return testEnv?.tearDown();
});
it.only('should be able to handle a GitHub event', async () => {
it('should be able to handle a GitHub event', async () => {
const user = testEnv.getUser('user');
const bridgeApi = await getBridgeApi(testEnv.opts.config?.widgets?.publicUrl!, user);
const testRoomId = await user.createRoom({ name: 'Test room', invite:[testEnv.botMxid] });

View File

@ -220,15 +220,16 @@ export class FeedConnection extends BaseConnection implements IConnection {
// We want to retry these sends, because sometimes the network / HS
// craps out.
const content = {
msgtype: 'm.notice',
format: "org.matrix.custom.html",
formatted_body: md.renderInline(message),
body: message,
external_url: entry.link ?? undefined,
"uk.half-shot.matrix-hookshot.feeds.item": entry,
};
await retry(
() => this.intent.sendEvent(this.roomId, {
msgtype: 'm.notice',
format: "org.matrix.custom.html",
formatted_body: md.renderInline(message),
body: message,
external_url: entry.link ?? undefined,
"uk.half-shot.matrix-hookshot.feeds.item": entry,
}),
() => this.intent.sendEvent(this.roomId, content),
SEND_EVENT_MAX_ATTEMPTS,
SEND_EVENT_INTERVAL_MS,
// Filter for showstopper errors like 4XX errors, but otherwise

View File

@ -35,8 +35,9 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider
return this.feedGuids.has(url);
}
async hasSeenFeedGuid(url: string, guid: string): Promise<boolean> {
return this.feedGuids.get(url)?.includes(guid) ?? false;
async hasSeenFeedGuids(url: string, ...guids: string[]): Promise<string[]> {
const existing = this.feedGuids.get(url);
return existing ? guids.filter((existingGuid) => existing.includes(existingGuid)) : [];
}
public async setGithubIssue(repo: string, issueNumber: string, data: IssuesGetResponseData, scope = "") {

View File

@ -29,11 +29,10 @@ const WIDGET_USER_TOKENS = "widgets.user-tokens.";
const FEED_GUIDS = "feeds.guids.";
const log = new Logger("RedisASProvider");
export class RedisStorageContextualProvider implements IStorageProvider {
constructor(protected readonly redis: Redis, protected readonly contextSuffix = '') { }
public setSyncToken(token: string|null){
@ -216,9 +215,9 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme
await this.redis.set(key, JSON.stringify(value));
}
public async storeFeedGuids(url: string, ...guid: string[]): Promise<void> {
public async storeFeedGuids(url: string, ...guids: string[]): Promise<void> {
const feedKey = `${FEED_GUIDS}${url}`;
await this.redis.lpush(feedKey, ...guid);
await this.redis.lpush(feedKey, ...guids);
await this.redis.ltrim(feedKey, 0, MAX_FEED_ITEMS);
}
@ -226,7 +225,16 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme
return (await this.redis.exists(`${FEED_GUIDS}${url}`)) === 1;
}
public async hasSeenFeedGuid(url: string, guid: string): Promise<boolean> {
return (await this.redis.lpos(`${FEED_GUIDS}${url}`, guid)) != null;
public async hasSeenFeedGuids(url: string, ...guids: string[]): Promise<string[]> {
let multi = this.redis.multi();
for (const guid of guids) {
multi = multi.lpos(`${FEED_GUIDS}${url}`, guid);
}
const res = await multi.exec();
if (res === null) {
// Just assume we've seen none.
return [];
}
return guids.filter((_guid, index) => res[index][1] !== null);
}
}

View File

@ -25,7 +25,7 @@ export interface IBridgeStorageProvider extends IAppserviceStorageProvider, ISto
setStoredTempFile(key: string, value: string): Promise<void>;
getGitlabDiscussionThreads(connectionId: string): Promise<SerializedGitlabDiscussionThreads>;
setGitlabDiscussionThreads(connectionId: string, value: SerializedGitlabDiscussionThreads): Promise<void>;
storeFeedGuids(url: string, ...guid: string[]): Promise<void>;
hasSeenFeed(url: string, ...guid: string[]): Promise<boolean>;
hasSeenFeedGuid(url: string, guid: string): Promise<boolean>;
storeFeedGuids(url: string, ...guids: string[]): Promise<void>;
hasSeenFeed(url: string): Promise<boolean>;
hasSeenFeedGuids(url: string, ...guids: string[]): Promise<string[]>;
}

View File

@ -9,8 +9,14 @@ import { randomUUID } from "crypto";
import { readFeed } from "../libRs";
import { IBridgeStorageProvider } from "../Stores/StorageProvider";
import UserAgent from "../UserAgent";
import { QueueWithBackoff } from "../libRs";
const log = new Logger("FeedReader");
const BACKOFF_TIME_MAX_MS = 24 * 60 * 60 * 1000;
const BACKOFF_POW = 1.05;
const BACKOFF_TIME_MS = 5 * 1000;
export class FeedError extends Error {
constructor(
public url: string,
@ -73,21 +79,11 @@ function normalizeUrl(input: string): string {
return url.toString();
}
function shuffle<T>(array: T[]): T[] {
for (let i = array.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
[array[i], array[j]] = [array[j], array[i]];
}
return array;
}
export class FeedReader {
private connections: FeedConnection[];
// ts should notice that we do in fact initialize it in constructor, but it doesn't (in this version)
private observedFeedUrls: Set<string> = new Set();
private feedQueue: string[] = [];
private feedQueue = new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS);
// A set of last modified times for each url.
private cacheTimes: Map<string, { etag?: string, lastModified?: string}> = new Map();
@ -100,11 +96,12 @@ export class FeedReader {
private shouldRun = true;
private readonly timeouts: (NodeJS.Timeout|undefined)[];
private readonly feedsToRetain = new Set();
get sleepingInterval() {
return (
// Calculate the number of MS to wait in between feeds.
(this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length || 1)
(this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length() || 1)
// And multiply by the number of concurrent readers
) * this.config.pollConcurrency;
}
@ -120,22 +117,49 @@ export class FeedReader {
this.timeouts.fill(undefined);
Object.seal(this.timeouts);
this.connections = this.connectionManager.getAllConnectionsOfType(FeedConnection);
this.calculateFeedUrls();
connectionManager.on('new-connection', c => {
if (c instanceof FeedConnection) {
log.debug('New connection tracked:', c.connectionId);
this.connections.push(c);
this.calculateFeedUrls();
const feeds = this.calculateInitialFeedUrls();
connectionManager.on('new-connection', newConnection => {
if (!(newConnection instanceof FeedConnection)) {
return;
}
const normalisedUrl = normalizeUrl(newConnection.feedUrl);
if (!feeds.has(normalisedUrl)) {
log.info(`Connection added, adding "${normalisedUrl}" to queue`);
this.feedQueue.push(normalisedUrl);
feeds.add(normalisedUrl);
Metrics.feedsCount.inc();
Metrics.feedsCountDeprecated.inc();
}
});
connectionManager.on('connection-removed', removed => {
if (removed instanceof FeedConnection) {
this.connections = this.connections.filter(c => c.connectionId !== removed.connectionId);
this.calculateFeedUrls();
if (!(removed instanceof FeedConnection)) {
return;
}
let shouldKeepUrl = false;
const normalisedUrl = normalizeUrl(removed.feedUrl);
this.connections = this.connections.filter(c => {
// Cheeky reuse of iteration to determine if we should remove this URL.
if (c.connectionId !== removed.connectionId) {
shouldKeepUrl = shouldKeepUrl || normalizeUrl(c.feedUrl) === normalisedUrl;
return true;
}
return false;
});
if (shouldKeepUrl) {
log.info(`Connection removed, but not removing "${normalisedUrl}" as it is still in use`);
return;
}
log.info(`Connection removed, removing "${normalisedUrl}" from queue`);
this.feedsToRetain.delete(normalisedUrl);
this.feedQueue.remove(normalisedUrl);
feeds.delete(normalisedUrl);
this.feedsFailingHttp.delete(normalisedUrl);
this.feedsFailingParsing.delete(normalisedUrl);
Metrics.feedsCount.dec();
Metrics.feedsCountDeprecated.dec();
});
log.debug('Loaded feed URLs:', this.observedFeedUrls);
log.debug('Loaded feed URLs:', [...feeds].join(', '));
for (let i = 0; i < config.pollConcurrency; i++) {
void this.pollFeeds(i);
@ -147,21 +171,24 @@ export class FeedReader {
this.timeouts.forEach(t => clearTimeout(t));
}
private calculateFeedUrls(): void {
/**
* Calculate the initial feed set for the reader. Should never
* be called twice.
*/
private calculateInitialFeedUrls(): Set<string> {
// just in case we got an invalid URL somehow
const normalizedUrls = [];
const observedFeedUrls = new Set<string>();
for (const conn of this.connections) {
try {
normalizedUrls.push(normalizeUrl(conn.feedUrl));
observedFeedUrls.add(normalizeUrl(conn.feedUrl));
} catch (err: unknown) {
log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`);
}
}
this.observedFeedUrls = new Set(normalizedUrls);
this.feedQueue = shuffle([...this.observedFeedUrls.values()]);
Metrics.feedsCount.set(this.observedFeedUrls.size);
Metrics.feedsCountDeprecated.set(this.observedFeedUrls.size);
this.feedQueue.populate([...observedFeedUrls]);
Metrics.feedsCount.set(observedFeedUrls.size);
Metrics.feedsCountDeprecated.set(observedFeedUrls.size);
return observedFeedUrls;
}
/**
@ -173,6 +200,11 @@ export class FeedReader {
* @returns A boolean that returns if we saw any changes on the feed since the last poll time.
*/
public async pollFeed(url: string): Promise<boolean> {
// If a feed is deleted while it is being polled, we need
// to remember NOT to add it back to the queue. This
// set keeps track of all the feeds that *should* be
// requeued.
this.feedsToRetain.add(url);
let seenEntriesChanged = false;
const fetchKey = randomUUID();
const { etag, lastModified } = this.cacheTimes.get(url) || {};
@ -203,22 +235,20 @@ export class FeedReader {
if (feed) {
// If undefined, we got a not-modified.
log.debug(`Found ${feed.items.length} entries in ${url}`);
const seenItems = await this.storage.hasSeenFeedGuids(url, ...feed.items.filter(item => !!item.hashId).map(item => item.hashId!))
for (const item of feed.items) {
// Some feeds have a nasty habit of leading a empty tag there, making us parse it as garbage.
if (!item.hashId) {
log.error(`Could not determine guid for entry in ${url}, skipping`);
continue;
}
const hashId = `md5:${item.hashId}`;
newGuids.push(hashId);
if (initialSync) {
log.debug(`Skipping entry ${item.id ?? hashId} since we're performing an initial sync`);
if (seenItems.includes(item.hashId)) {
continue;
}
if (await this.storage.hasSeenFeedGuid(url, hashId)) {
log.debug('Skipping already seen entry', item.id ?? hashId);
newGuids.push(item.hashId);
if (initialSync) {
log.debug(`Skipping entry ${item.id ?? item.hashId} since we're performing an initial sync`);
continue;
}
const entry = {
@ -243,12 +273,15 @@ export class FeedReader {
if (seenEntriesChanged && newGuids.length) {
await this.storage.storeFeedGuids(url, ...newGuids);
}
}
this.queue.push<FeedSuccess>({ eventName: 'feed.success', sender: 'FeedReader', data: { url } });
// Clear any feed failures
this.feedsFailingHttp.delete(url);
this.feedsFailingParsing.delete(url);
if (this.feedsToRetain.has(url)) {
// If we've removed this feed since processing it, do not requeue.
this.feedQueue.push(url);
}
} catch (err: unknown) {
// TODO: Proper Rust Type error.
if ((err as Error).message.includes('Failed to fetch feed due to HTTP')) {
@ -256,12 +289,11 @@ export class FeedReader {
} else {
this.feedsFailingParsing.add(url);
}
const backoffDuration = this.feedQueue.backoff(url);
const error = err instanceof Error ? err : new Error(`Unknown error ${err}`);
const feedError = new FeedError(url.toString(), error, fetchKey);
log.error("Unable to read feed:", feedError.message);
log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`);
this.queue.push<FeedError>({ eventName: 'feed.error', sender: 'FeedReader', data: feedError});
} finally {
this.feedQueue.push(url);
}
return seenEntriesChanged;
}
@ -277,11 +309,11 @@ export class FeedReader {
Metrics.feedsFailingDeprecated.set({ reason: "http" }, this.feedsFailingHttp.size );
Metrics.feedsFailingDeprecated.set({ reason: "parsing" }, this.feedsFailingParsing.size);
log.debug(`Checking for updates in ${this.observedFeedUrls.size} RSS/Atom feeds (worker: ${workerId})`);
log.debug(`Checking for updates in ${this.feedQueue.length()} RSS/Atom feeds (worker: ${workerId})`);
const fetchingStarted = Date.now();
const [ url ] = this.feedQueue.splice(0, 1);
const url = this.feedQueue.pop();
let sleepFor = this.sleepingInterval;
if (url) {
@ -298,7 +330,7 @@ export class FeedReader {
log.warn(`It took us longer to update the feeds than the configured pool interval`);
}
} else {
// It may be possible that we have more workers than feeds. This will cause the worker to just sleep.
// It is possible that we have more workers than feeds. This will cause the worker to just sleep.
log.debug(`No feeds available to poll for worker ${workerId}`);
}

View File

@ -70,7 +70,8 @@ fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel {
.map(|f| f.value)
.or(item.link.clone())
.or(item.title.clone())
.and_then(|f| hash_id(f).ok()),
.and_then(|f| hash_id(f).ok())
.map(|f| format!("md5:{}", f)),
})
.collect(),
}

View File

@ -3,6 +3,7 @@ pub mod feeds;
pub mod format_util;
pub mod github;
pub mod jira;
pub mod util;
#[macro_use]
extern crate napi_derive;

148
src/util/mod.rs Normal file
View File

@ -0,0 +1,148 @@
use rand::prelude::*;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::time::{SystemTime, UNIX_EPOCH};
const DEFAULT_BACKOFF_TIME_MAX_MS: f64 = 24f64 * 60f64 * 60f64 * 1000f64;
const DEFAULT_BACKOFF_POW: f64 = 1.05f64;
const DEFAULT_BACKOFF_TIME_MS: f64 = 5f64 * 1000f64;
#[napi]
pub struct QueueWithBackoff {
queue: VecDeque<String>,
/**
* A map of absolute backoff timestamps mapped to the value.
*/
backoff: BTreeMap<u64, String>,
/**
* The last duration applied when a value was backed off.
*/
last_backoff_duration: HashMap<String, u32>,
backoff_time: f64,
backoff_exponent: f64,
backoff_max: f64,
}
impl Default for QueueWithBackoff {
fn default() -> Self {
Self::new(
DEFAULT_BACKOFF_TIME_MS,
DEFAULT_BACKOFF_POW,
DEFAULT_BACKOFF_TIME_MAX_MS,
)
}
}
#[napi]
impl QueueWithBackoff {
#[napi(constructor)]
pub fn new(backoff_time: f64, backoff_exponent: f64, backoff_max: f64) -> Self {
QueueWithBackoff {
queue: VecDeque::new(),
backoff: BTreeMap::new(),
last_backoff_duration: HashMap::new(),
backoff_time,
backoff_exponent,
backoff_max,
}
}
#[napi]
pub fn pop(&mut self) -> Option<String> {
let start = SystemTime::now();
let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap().as_millis() as u64;
// We only need to check this once, as we won't be adding to the backoff queue
// as often as we pull from it.
if let Some(item) = self.backoff.first_entry() {
if *item.key() < since_the_epoch {
let v = item.remove();
self.queue.push_back(v);
}
}
self.queue.pop_front()
}
#[napi]
pub fn remove(&mut self, item: String) -> bool {
// Remove from the queue
if let Ok(index) = self.queue.binary_search(&item) {
self.queue.remove(index);
return true;
} else {
// We didn't find the key queued, so let's ensure we delete it
// from any backoff.
// This is *expensive* but hopefully called rarely.
let mut found_key: u64 = 0;
for (key, value) in self.backoff.iter() {
if *value == item {
found_key = *key;
}
}
if found_key != 0 {
self.backoff.remove(&found_key);
return true;
}
}
// Always remove the duration on removal.
self.last_backoff_duration.remove(&item);
false
}
#[napi]
pub fn push(&mut self, item: String) {
self.last_backoff_duration.remove(&item);
self.queue.push_back(item);
}
#[napi]
pub fn backoff(&mut self, item: String) -> u32 {
let last_backoff = (*self.last_backoff_duration.get(&item).unwrap_or(&0)) as f64;
let mut rng = rand::thread_rng();
let y: f64 = rng.gen::<f64>() + 0.5f64; // generates a float between 0.5 and 1.1
let backoff_duration = ((y * self.backoff_time) + last_backoff.powf(self.backoff_exponent))
.min(self.backoff_max) as u32;
let backoff_item = item.clone();
self.last_backoff_duration.insert(item, backoff_duration);
let start = SystemTime::now();
let since_the_epoch = start.duration_since(UNIX_EPOCH).unwrap();
let mut time = since_the_epoch.as_millis() as u64 + backoff_duration as u64;
// If the backoff queue contains this time (likely)
// then we want to increase the backoff time slightly
// to allow for it.
let incr: f64 = (rng.gen::<f64>() * 2f64) + 2f64;
while self.backoff.contains_key(&time) {
time += (incr * self.backoff_time) as u64;
}
self.backoff.insert(time, backoff_item);
backoff_duration
}
#[napi]
pub fn length(&self) -> u32 {
self.queue.len() as u32
}
fn shuffle(&mut self) {
let mut rng = rand::thread_rng();
self.queue.make_contiguous().shuffle(&mut rng);
}
#[napi]
pub fn populate(&mut self, values: Vec<String>) {
// This assumes an empty queue.
for v in values {
self.queue.push_back(v);
}
self.shuffle();
}
}

View File

@ -23,6 +23,7 @@ const GITHUB_ISSUE = {
},
html_url: `https://github.com/${GITHUB_ORG_REPO.org}/${GITHUB_ORG_REPO.repo}/issues/1234`,
title: "My issue",
assignees: []
};
const GITHUB_ISSUE_CREATED_PAYLOAD = {
@ -137,7 +138,7 @@ describe("GitHubRepoConnection", () => {
intent.expectEventBodyContains(GITHUB_ISSUE_CREATED_PAYLOAD.issue.html_url, 0);
intent.expectEventBodyContains(GITHUB_ISSUE_CREATED_PAYLOAD.issue.title, 0);
});
it.only("will handle assignees on issue creation", async () => {
it("will handle assignees on issue creation", async () => {
const { connection, intent } = createConnection();
await connection.onIssueCreated({
...GITHUB_ISSUE_CREATED_PAYLOAD,