diff --git a/Cargo.lock b/Cargo.lock index 7d8fdad6..e844d1cc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -681,6 +681,7 @@ dependencies = [ "napi", "napi-build", "napi-derive", + "rand", "reqwest", "rgb", "rss", diff --git a/Cargo.toml b/Cargo.toml index 4d75e408..ab6e33d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ rss = "2.0" atom_syndication = "0.12" ruma = { version = "0.9", features = ["events", "html"] } reqwest = "0.11" +rand = "0.8.5" [build-dependencies] napi-build = "2" diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index d4909ba7..28fcf7a9 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -9,10 +9,7 @@ import { randomUUID } from "crypto"; import { readFeed } from "../libRs"; import { IBridgeStorageProvider } from "../Stores/StorageProvider"; import UserAgent from "../UserAgent"; - -const FEED_BACKOFF_TIME_MS = 5 * 1000; -const FEED_BACKOFF_POW = 1.05; -const FEED_BACKOFF_TIME_MAX_MS = 24 * 60 * 60 * 1000; +import { QueueWithBackoff } from "../libRs"; const log = new Logger("FeedReader"); export class FeedError extends Error { @@ -77,24 +74,13 @@ function normalizeUrl(input: string): string { return url.toString(); } -function shuffle(array: T[]): T[] { - for (let i = array.length - 1; i > 0; i--) { - const j = Math.floor(Math.random() * (i + 1)); - [array[i], array[j]] = [array[j], array[i]]; - } - return array; -} - export class FeedReader { private connections: FeedConnection[]; // ts should notice that we do in fact initialize it in constructor, but it doesn't (in this version) private observedFeedUrls: Set = new Set(); - private feedQueue: string[] = []; - - private feedBackoff: Map = new Map(); - private feedLastBackoff: Map = new Map(); + private feedQueue = new QueueWithBackoff(); // A set of last modified times for each url. private cacheTimes: Map = new Map(); @@ -111,7 +97,7 @@ export class FeedReader { get sleepingInterval() { return ( // Calculate the number of MS to wait in between feeds. - (this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length || 1) + (this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length() || 1) // And multiply by the number of concurrent readers ) * this.config.pollConcurrency; } @@ -156,16 +142,16 @@ export class FeedReader { private calculateFeedUrls(): void { // just in case we got an invalid URL somehow - const normalizedUrls = []; + const observedFeedUrls = new Set(); for (const conn of this.connections) { try { - normalizedUrls.push(normalizeUrl(conn.feedUrl)); + observedFeedUrls.add(normalizeUrl(conn.feedUrl)); } catch (err: unknown) { log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`); } } - this.observedFeedUrls = new Set(normalizedUrls); - this.feedQueue = shuffle([...this.observedFeedUrls.values()]); + observedFeedUrls.forEach(url => this.feedQueue.push(url)); + this.feedQueue.shuffle(); Metrics.feedsCount.set(this.observedFeedUrls.size); Metrics.feedsCountDeprecated.set(this.observedFeedUrls.size); @@ -256,7 +242,6 @@ export class FeedReader { // Clear any feed failures this.feedsFailingHttp.delete(url); this.feedsFailingParsing.delete(url); - this.feedLastBackoff.delete(url); this.feedQueue.push(url); } catch (err: unknown) { // TODO: Proper Rust Type error. @@ -265,10 +250,7 @@ export class FeedReader { } else { this.feedsFailingParsing.add(url); } - const backoffDuration = Math.min(FEED_BACKOFF_TIME_MAX_MS, ( - Math.ceil((Math.random() + 0.5) * FEED_BACKOFF_TIME_MS)) + Math.pow(this.feedLastBackoff.get(url) ?? 0, FEED_BACKOFF_POW)); - this.feedBackoff.set(url, Date.now() + backoffDuration); - this.feedLastBackoff.set(url, backoffDuration); + const backoffDuration = this.feedQueue.backoff(url); const error = err instanceof Error ? err : new Error(`Unknown error ${err}`); const feedError = new FeedError(url.toString(), error, fetchKey); log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`); @@ -288,11 +270,11 @@ export class FeedReader { Metrics.feedsFailingDeprecated.set({ reason: "http" }, this.feedsFailingHttp.size ); Metrics.feedsFailingDeprecated.set({ reason: "parsing" }, this.feedsFailingParsing.size); - log.debug(`Checking for updates in ${this.observedFeedUrls.size} RSS/Atom feeds (worker: ${workerId})`); + log.debug(`Checking for updates in ${this.feedQueue.length()} RSS/Atom feeds (worker: ${workerId})`); const fetchingStarted = Date.now(); - const [ url ] = this.feedQueue.splice(0, 1); + const url = this.feedQueue.next(); let sleepFor = this.sleepingInterval; if (url) { @@ -319,15 +301,5 @@ export class FeedReader { } void this.pollFeeds(workerId); }, sleepFor); - - // Reinsert any feeds that we may have backed off. - for (const [feedUrl, retryAfter] of this.feedBackoff.entries()) { - if (retryAfter < Date.now()) { - log.debug(`Adding back ${feedUrl} from backoff set`); - this.feedQueue.push(feedUrl); - // Store the last backoff time. - this.feedBackoff.delete(feedUrl) - } - } } } diff --git a/src/lib.rs b/src/lib.rs index 1d03f680..3a15bd65 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ pub mod feeds; pub mod format_util; pub mod github; pub mod jira; +pub mod util; #[macro_use] extern crate napi_derive; diff --git a/src/util/mod.rs b/src/util/mod.rs new file mode 100644 index 00000000..42e45353 --- /dev/null +++ b/src/util/mod.rs @@ -0,0 +1,100 @@ +use std::collections::LinkedList; +use std::collections::HashMap; +use std::time::{SystemTime, UNIX_EPOCH}; +use rand::prelude::*; + +const BACKOFF_TIME_MAX_MS: f32 = 24f32 * 60f32 * 60f32 * 1000f32; +const BACKOFF_POW: f32 = 1.05f32; +const BACKOFF_TIME_MS: f32 = 5f32 * 1000f32; + +#[napi] + +pub struct QueueWithBackoff { + queue: LinkedList, + backoff: HashMap, + last_backoff: HashMap +} + +#[napi] + +impl QueueWithBackoff { + #[napi(constructor)] + pub fn new() -> Self { + QueueWithBackoff { + queue: LinkedList::new(), + backoff: HashMap::new(), + last_backoff: HashMap::new(), + } + } + + + #[napi] + pub fn next(&mut self) -> Option { + let start = SystemTime::now(); + let since_the_epoch = start + .duration_since(UNIX_EPOCH).unwrap(); + + let mut items_to_rm: Vec = vec![]; + for item in self.backoff.iter() { + if *item.1 < since_the_epoch.as_millis() { + self.queue.push_back(item.0.clone()); + items_to_rm.push(item.0.clone()); + } + } + + for item in items_to_rm { + self.backoff.remove(&item); + } + + return self.queue.pop_front() + } + + + #[napi] + pub fn push(&mut self, item: String) { + self.last_backoff.remove(&item); + self.queue.push_back(item); + } + + + #[napi] + pub fn backoff(&mut self, item: String) -> u32 { + let last_backoff = (*self.last_backoff.get(&item).unwrap_or(&0)) as f32; + + let mut rng = rand::thread_rng(); + let y: f32 = rng.gen::() + 0.5f32; // generates a float between 0 and 1 + + let backoff_duration = ((y * BACKOFF_TIME_MS) + f32::from(last_backoff).powf(BACKOFF_POW)).min(BACKOFF_TIME_MAX_MS) as u32; + let backoff_item = item.clone(); + self.last_backoff.insert(item, backoff_duration); + + let start = SystemTime::now(); + let since_the_epoch = start + .duration_since(UNIX_EPOCH).unwrap(); + + let time = since_the_epoch.as_millis() + backoff_duration as u128; + + self.backoff.insert(backoff_item, time); + return backoff_duration; + } + + + #[napi] + pub fn length(&self) -> u32 { + self.queue.len() as u32 + } + + #[napi] + pub fn shuffle(&mut self) { + let mut rng = rand::thread_rng(); + let old_queue = self.queue.clone(); + self.queue.clear(); + for item in old_queue { + if rng.gen_bool(0.5) { + self.queue.push_front(item); + } else { + self.queue.push_back(item); + } + } + } +} \ No newline at end of file