Rewrite in rust.

This commit is contained in:
Half-Shot 2024-02-06 12:58:37 +00:00
parent 18429cef94
commit 80d4b9badc
5 changed files with 113 additions and 38 deletions

1
Cargo.lock generated
View File

@ -681,6 +681,7 @@ dependencies = [
"napi", "napi",
"napi-build", "napi-build",
"napi-derive", "napi-derive",
"rand",
"reqwest", "reqwest",
"rgb", "rgb",
"rss", "rss",

View File

@ -21,6 +21,7 @@ rss = "2.0"
atom_syndication = "0.12" atom_syndication = "0.12"
ruma = { version = "0.9", features = ["events", "html"] } ruma = { version = "0.9", features = ["events", "html"] }
reqwest = "0.11" reqwest = "0.11"
rand = "0.8.5"
[build-dependencies] [build-dependencies]
napi-build = "2" napi-build = "2"

View File

@ -9,10 +9,7 @@ import { randomUUID } from "crypto";
import { readFeed } from "../libRs"; import { readFeed } from "../libRs";
import { IBridgeStorageProvider } from "../Stores/StorageProvider"; import { IBridgeStorageProvider } from "../Stores/StorageProvider";
import UserAgent from "../UserAgent"; import UserAgent from "../UserAgent";
import { QueueWithBackoff } from "../libRs";
const FEED_BACKOFF_TIME_MS = 5 * 1000;
const FEED_BACKOFF_POW = 1.05;
const FEED_BACKOFF_TIME_MAX_MS = 24 * 60 * 60 * 1000;
const log = new Logger("FeedReader"); const log = new Logger("FeedReader");
export class FeedError extends Error { export class FeedError extends Error {
@ -77,24 +74,13 @@ function normalizeUrl(input: string): string {
return url.toString(); return url.toString();
} }
function shuffle<T>(array: T[]): T[] {
for (let i = array.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
[array[i], array[j]] = [array[j], array[i]];
}
return array;
}
export class FeedReader { export class FeedReader {
private connections: FeedConnection[]; private connections: FeedConnection[];
// ts should notice that we do in fact initialize it in constructor, but it doesn't (in this version) // ts should notice that we do in fact initialize it in constructor, but it doesn't (in this version)
private observedFeedUrls: Set<string> = new Set(); private observedFeedUrls: Set<string> = new Set();
private feedQueue: string[] = []; private feedQueue = new QueueWithBackoff();
private feedBackoff: Map<string, number> = new Map();
private feedLastBackoff: Map<string, number> = new Map();
// A set of last modified times for each url. // A set of last modified times for each url.
private cacheTimes: Map<string, { etag?: string, lastModified?: string}> = new Map(); private cacheTimes: Map<string, { etag?: string, lastModified?: string}> = new Map();
@ -111,7 +97,7 @@ export class FeedReader {
get sleepingInterval() { get sleepingInterval() {
return ( return (
// Calculate the number of MS to wait in between feeds. // Calculate the number of MS to wait in between feeds.
(this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length || 1) (this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length() || 1)
// And multiply by the number of concurrent readers // And multiply by the number of concurrent readers
) * this.config.pollConcurrency; ) * this.config.pollConcurrency;
} }
@ -156,16 +142,16 @@ export class FeedReader {
private calculateFeedUrls(): void { private calculateFeedUrls(): void {
// just in case we got an invalid URL somehow // just in case we got an invalid URL somehow
const normalizedUrls = []; const observedFeedUrls = new Set<string>();
for (const conn of this.connections) { for (const conn of this.connections) {
try { try {
normalizedUrls.push(normalizeUrl(conn.feedUrl)); observedFeedUrls.add(normalizeUrl(conn.feedUrl));
} catch (err: unknown) { } catch (err: unknown) {
log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`); log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`);
} }
} }
this.observedFeedUrls = new Set(normalizedUrls); observedFeedUrls.forEach(url => this.feedQueue.push(url));
this.feedQueue = shuffle([...this.observedFeedUrls.values()]); this.feedQueue.shuffle();
Metrics.feedsCount.set(this.observedFeedUrls.size); Metrics.feedsCount.set(this.observedFeedUrls.size);
Metrics.feedsCountDeprecated.set(this.observedFeedUrls.size); Metrics.feedsCountDeprecated.set(this.observedFeedUrls.size);
@ -256,7 +242,6 @@ export class FeedReader {
// Clear any feed failures // Clear any feed failures
this.feedsFailingHttp.delete(url); this.feedsFailingHttp.delete(url);
this.feedsFailingParsing.delete(url); this.feedsFailingParsing.delete(url);
this.feedLastBackoff.delete(url);
this.feedQueue.push(url); this.feedQueue.push(url);
} catch (err: unknown) { } catch (err: unknown) {
// TODO: Proper Rust Type error. // TODO: Proper Rust Type error.
@ -265,10 +250,7 @@ export class FeedReader {
} else { } else {
this.feedsFailingParsing.add(url); this.feedsFailingParsing.add(url);
} }
const backoffDuration = Math.min(FEED_BACKOFF_TIME_MAX_MS, ( const backoffDuration = this.feedQueue.backoff(url);
Math.ceil((Math.random() + 0.5) * FEED_BACKOFF_TIME_MS)) + Math.pow(this.feedLastBackoff.get(url) ?? 0, FEED_BACKOFF_POW));
this.feedBackoff.set(url, Date.now() + backoffDuration);
this.feedLastBackoff.set(url, backoffDuration);
const error = err instanceof Error ? err : new Error(`Unknown error ${err}`); const error = err instanceof Error ? err : new Error(`Unknown error ${err}`);
const feedError = new FeedError(url.toString(), error, fetchKey); const feedError = new FeedError(url.toString(), error, fetchKey);
log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`); log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`);
@ -288,11 +270,11 @@ export class FeedReader {
Metrics.feedsFailingDeprecated.set({ reason: "http" }, this.feedsFailingHttp.size ); Metrics.feedsFailingDeprecated.set({ reason: "http" }, this.feedsFailingHttp.size );
Metrics.feedsFailingDeprecated.set({ reason: "parsing" }, this.feedsFailingParsing.size); Metrics.feedsFailingDeprecated.set({ reason: "parsing" }, this.feedsFailingParsing.size);
log.debug(`Checking for updates in ${this.observedFeedUrls.size} RSS/Atom feeds (worker: ${workerId})`); log.debug(`Checking for updates in ${this.feedQueue.length()} RSS/Atom feeds (worker: ${workerId})`);
const fetchingStarted = Date.now(); const fetchingStarted = Date.now();
const [ url ] = this.feedQueue.splice(0, 1); const url = this.feedQueue.next();
let sleepFor = this.sleepingInterval; let sleepFor = this.sleepingInterval;
if (url) { if (url) {
@ -319,15 +301,5 @@ export class FeedReader {
} }
void this.pollFeeds(workerId); void this.pollFeeds(workerId);
}, sleepFor); }, sleepFor);
// Reinsert any feeds that we may have backed off.
for (const [feedUrl, retryAfter] of this.feedBackoff.entries()) {
if (retryAfter < Date.now()) {
log.debug(`Adding back ${feedUrl} from backoff set`);
this.feedQueue.push(feedUrl);
// Store the last backoff time.
this.feedBackoff.delete(feedUrl)
}
}
} }
} }

View File

@ -3,6 +3,7 @@ pub mod feeds;
pub mod format_util; pub mod format_util;
pub mod github; pub mod github;
pub mod jira; pub mod jira;
pub mod util;
#[macro_use] #[macro_use]
extern crate napi_derive; extern crate napi_derive;

100
src/util/mod.rs Normal file
View File

@ -0,0 +1,100 @@
use std::collections::LinkedList;
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
use rand::prelude::*;
const BACKOFF_TIME_MAX_MS: f32 = 24f32 * 60f32 * 60f32 * 1000f32;
const BACKOFF_POW: f32 = 1.05f32;
const BACKOFF_TIME_MS: f32 = 5f32 * 1000f32;
#[napi]
pub struct QueueWithBackoff {
queue: LinkedList<String>,
backoff: HashMap<String, u128>,
last_backoff: HashMap<String, u32>
}
#[napi]
impl QueueWithBackoff {
#[napi(constructor)]
pub fn new() -> Self {
QueueWithBackoff {
queue: LinkedList::new(),
backoff: HashMap::new(),
last_backoff: HashMap::new(),
}
}
#[napi]
pub fn next(&mut self) -> Option<String> {
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH).unwrap();
let mut items_to_rm: Vec<String> = vec![];
for item in self.backoff.iter() {
if *item.1 < since_the_epoch.as_millis() {
self.queue.push_back(item.0.clone());
items_to_rm.push(item.0.clone());
}
}
for item in items_to_rm {
self.backoff.remove(&item);
}
return self.queue.pop_front()
}
#[napi]
pub fn push(&mut self, item: String) {
self.last_backoff.remove(&item);
self.queue.push_back(item);
}
#[napi]
pub fn backoff(&mut self, item: String) -> u32 {
let last_backoff = (*self.last_backoff.get(&item).unwrap_or(&0)) as f32;
let mut rng = rand::thread_rng();
let y: f32 = rng.gen::<f32>() + 0.5f32; // generates a float between 0 and 1
let backoff_duration = ((y * BACKOFF_TIME_MS) + f32::from(last_backoff).powf(BACKOFF_POW)).min(BACKOFF_TIME_MAX_MS) as u32;
let backoff_item = item.clone();
self.last_backoff.insert(item, backoff_duration);
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH).unwrap();
let time = since_the_epoch.as_millis() + backoff_duration as u128;
self.backoff.insert(backoff_item, time);
return backoff_duration;
}
#[napi]
pub fn length(&self) -> u32 {
self.queue.len() as u32
}
#[napi]
pub fn shuffle(&mut self) {
let mut rng = rand::thread_rng();
let old_queue = self.queue.clone();
self.queue.clear();
for item in old_queue {
if rng.gen_bool(0.5) {
self.queue.push_front(item);
} else {
self.queue.push_back(item);
}
}
}
}