From bf88aab67d9663353402968f119be9e2d9699132 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Mon, 12 Feb 2024 10:54:37 +0000 Subject: [PATCH] More cleanup --- Cargo.lock | 68 +++++++++++++++++++++++++++------ Cargo.toml | 1 + src/feeds/parser.rs | 7 ++-- src/feeds/reader.rs | 89 ++++++++++++++++++++++++++------------------ src/stores/memory.rs | 18 ++++----- src/stores/traits.rs | 8 ++-- 6 files changed, 124 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 29141553..3fd4e791 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -616,19 +616,46 @@ dependencies = [ ] [[package]] -name = "futures-channel" -version = "0.3.29" +name = "futures" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] name = "futures-core" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] [[package]] name = "futures-io" @@ -665,28 +692,44 @@ dependencies = [ ] [[package]] -name = "futures-sink" -version = "0.3.29" +name = "futures-macro" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.42", +] + +[[package]] +name = "futures-sink" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-util" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -1025,6 +1068,7 @@ dependencies = [ "async-std", "atom_syndication", "contrast", + "futures", "hex", "md-5", "napi", diff --git a/Cargo.toml b/Cargo.toml index 8ef64aef..76273c94 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ rand = "0.8.5" uuid = { version = "1.7.0", features = ["v4"] } async-std = "1.12.0" redis = { version = "0.24.0", features = ["aio", "tokio-comp"] } +futures = "0.3.30" [build-dependencies] napi-build = "2" diff --git a/src/feeds/parser.rs b/src/feeds/parser.rs index f6e84503..b0c4d1e0 100644 --- a/src/feeds/parser.rs +++ b/src/feeds/parser.rs @@ -20,7 +20,7 @@ pub struct FeedItem { pub pubdate: Option, pub summary: Option, pub author: Option, - pub hash_id: Option, + pub hash_id: String, } #[derive(Serialize, Debug, Deserialize)] @@ -71,7 +71,8 @@ fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel { .or(item.link.clone()) .or(item.title.clone()) .and_then(|f| hash_id(f).ok()) - .and_then(|f| Some(format!("md5:{}", f))), + .and_then(|f| Some(format!("md5:{}", f))) + .unwrap(), // TODO: Handle error }) .collect(), } @@ -118,7 +119,7 @@ fn parse_feed_to_js_result(feed: &Feed) -> JsRssChannel { .map(|date| date.to_rfc2822()), summary: item.summary().map(|v| v.value.clone()), author: authors_to_string(item.authors()), - hash_id: hash_id(item.id.clone()).ok(), + hash_id: hash_id(item.id.clone()).unwrap(), }) .collect(), } diff --git a/src/feeds/reader.rs b/src/feeds/reader.rs index bce483e2..148f38e4 100644 --- a/src/feeds/reader.rs +++ b/src/feeds/reader.rs @@ -1,10 +1,12 @@ use std::collections::{HashMap, HashSet}; +use std::future; use crate::util::QueueWithBackoff; use std::time::{Duration, Instant}; -use napi::bindgen_prelude::{Error as JsError, Status}; +use napi::bindgen_prelude::Error as JsError; use napi::tokio::sync::RwLock; use std::sync::Arc; use uuid::Uuid; +use futures::future::{Future, select_all}; use crate::feeds::parser::{js_read_feed, ReadFeedOptions}; use crate::stores::memory::MemoryStorageProvider; use crate::stores::traits::StorageProvider; @@ -51,7 +53,6 @@ pub struct FeedReader { queue: QueueWithBackoff, feeds_to_retain: HashSet, cache_times: Arc>>, - storage_provider: Box, poll_interval_seconds: f64, poll_concurrency: u8, poll_timeout_seconds: i64, @@ -70,14 +71,12 @@ impl FeedReader { pub fn new(poll_interval_seconds: f64, poll_concurrency: u8, poll_timeout_seconds: i64) -> Self { let mut cache_times: HashMap = HashMap::new(); let mut lock = Arc::new(RwLock::new(cache_times)); - let mut storage_provider = MemoryStorageProvider::new(); FeedReader { queue: QueueWithBackoff::new( BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS, ), - storage_provider, feeds_to_retain: HashSet::new(), poll_interval_seconds, poll_concurrency, @@ -105,7 +104,7 @@ impl FeedReader { } - async fn poll_feed(&self, url: &String, cache_times: Arc>>) -> Result, JsError> { + async fn poll_feed(&self, url: &String, cache_times: Arc>>, mut storage: &impl StorageProvider) -> Result, JsError> { let seen_entries_changed = false; let fetch_key = Uuid::new_v4().to_string(); @@ -128,13 +127,17 @@ impl FeedReader { }); drop(c_t_w); - let initial_sync = false; // TODO: Implement - let seen_items: HashSet = HashSet::new(); // TODO: Implement + let initial_sync = storage.has_seen_feed(url).await; let mut new_guids: Vec = Vec::new(); let new_entries: Vec = Vec::new(); if let Some(feed) = result.feed { + let items = feed.items.iter().map(|x| x.hash_id.clone()).collect::>(); + let seen_items = storage.has_seen_feed_guids( + url, + &items, + ).await; println!("Got feed result!"); let mut feed_info = HookshotFeedInfo { title: feed.title, @@ -144,26 +147,25 @@ impl FeedReader { }; for item in feed.items { println!("Got feed result! {:?}", item); - if let Some(hash_id) = item.hash_id { - if seen_items.contains(&hash_id) { - continue; - } - // TODO: Drop unwrap - new_guids.push(hash_id); - - if initial_sync { - // Skip. - continue; - } - feed_info.entries.push(HookshotFeedEntry { - title: item.title, - pubdate: item.pubdate, - summary: item.summary, - author: item.author, - link: item.link, - }); + if seen_items.contains(&item.hash_id) { + continue; } + + new_guids.push(item.hash_id.clone()); + + if initial_sync { + // Skip. + continue; + } + feed_info.entries.push(HookshotFeedEntry { + title: item.title, + pubdate: item.pubdate, + summary: item.summary, + author: item.author, + link: item.link, + }); } + storage.store_feed_guids(&url, &new_guids).await; return Ok(Some(feed_info)); } else { // TODO: Implement @@ -178,20 +180,33 @@ impl FeedReader { return (self.poll_interval_seconds * 1000.0) / self.queue.length() as f64; } + async unsafe fn poll_feed_int(&mut self, url: &String, mut storage_provider: &impl StorageProvider) { + let mut sleep_for = self.sleeping_interval(); + self.feeds_to_retain.insert(url.clone()); + let now = Instant::now(); + let result = self.poll_feed(url, self.cache_times.clone(), &storage_provider).await; + self.feeds_to_retain.remove(url); + let elapsed = now.elapsed(); + sleep_for = (sleep_for - (elapsed.as_millis() as f64)).max(0.0); + async_std::task::sleep(Duration::from_millis(sleep_for as u64)).await; + } + #[napi] pub async unsafe fn poll_feeds(&mut self) -> Result<(), JsError> { - let mut sleep_for = self.sleeping_interval(); - if let Some(url) = self.queue.pop() { - self.feeds_to_retain.insert(url.clone()); - let now = Instant::now(); - let result = self.poll_feed(&url, self.cache_times.clone()).await?; - self.feeds_to_retain.remove(&url); - let elapsed = now.elapsed(); - sleep_for = (sleep_for - (elapsed.as_millis() as f64)).max(0.0); - } else { - println!("No feeds available"); + let concurrency = self.poll_concurrency as usize; + let mut storage_provider = MemoryStorageProvider::new(); + let mut future_set: Vec<_> = Vec::new(); + loop { + if let Some(url) = self.queue.pop() { + let result = Box::pin(self.poll_feed_int(&url, &storage_provider)); + future_set.push(result); + } else { + async_std::task::sleep(Duration::from_millis(self.sleeping_interval() as u64)).await; + } + if future_set.len() >= concurrency { + let (item_resolved, ready_future_index, _remaining_futures) = + select_all(future_set).await; + } } - async_std::task::sleep(Duration::from_millis(sleep_for as u64)).await; - Ok(()) } } \ No newline at end of file diff --git a/src/stores/memory.rs b/src/stores/memory.rs index afdce24c..1293d00f 100644 --- a/src/stores/memory.rs +++ b/src/stores/memory.rs @@ -14,23 +14,19 @@ impl MemoryStorageProvider { } impl StorageProvider for MemoryStorageProvider { - async fn store_feed_guids(&mut self, url: &String, guids: &Vec) -> Result<(), Err> { - let mut guid_set = self.guids.get(url).or_else(|| { - let new = HashSet::new(); - self.guids.insert(url.clone(), new); - Some(&new) - }).unwrap(); + async fn store_feed_guids(&mut self, url: &String, guids: &Vec) { + let guid_set = self.guids.entry(url.clone()).or_insert(HashSet::default()); + for guid in guids { guid_set.insert(guid.clone()); } - Ok(()) } - async fn has_seen_feed(&self, url: &String, guids: &Vec) -> Result> { - Ok(self.guids.contains_key(url)) + async fn has_seen_feed(&self, url: &String) -> bool { + self.guids.contains_key(url) } - async fn has_seen_feed_guids(&self,url: &String, guids: &Vec) -> Result, Err> { + async fn has_seen_feed_guids(&self,url: &String, guids: &Vec) -> Vec { let mut seen_guids = Vec::default(); if let Some(existing_guids) = self.guids.get(url) { for guid in guids { @@ -39,6 +35,6 @@ impl StorageProvider for MemoryStorageProvider { } } } - Ok(seen_guids) + seen_guids } } \ No newline at end of file diff --git a/src/stores/traits.rs b/src/stores/traits.rs index aa841822..f44fcc81 100644 --- a/src/stores/traits.rs +++ b/src/stores/traits.rs @@ -1,5 +1,5 @@ -trait StorageProvider { - async fn store_feed_guids(&mut self, url: &String, guids: &Vec) -> Result>; - async fn has_seen_feed(&self, url: &String, guids: &Vec) -> Result>; - async fn has_seen_feed_guids(&self, url: &String, guids: &Vec) -> Result, Err>; +pub trait StorageProvider { + async fn store_feed_guids(&mut self, url: &String, guids: &Vec); + async fn has_seen_feed(&self, url: &String) -> bool; + async fn has_seen_feed_guids(&self, url: &String, guids: &Vec) -> Vec; } \ No newline at end of file