More cleanup

This commit is contained in:
Will Hunt 2024-02-12 10:54:37 +00:00
parent 4b626ee420
commit bf88aab67d
6 changed files with 124 additions and 67 deletions

68
Cargo.lock generated
View File

@ -616,19 +616,46 @@ dependencies = [
]
[[package]]
name = "futures-channel"
version = "0.3.29"
name = "futures"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.29"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
@ -665,28 +692,44 @@ dependencies = [
]
[[package]]
name = "futures-sink"
version = "0.3.29"
name = "futures-macro"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.42",
]
[[package]]
name = "futures-sink"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
[[package]]
name = "futures-task"
version = "0.3.29"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2"
checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
[[package]]
name = "futures-util"
version = "0.3.29"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
@ -1025,6 +1068,7 @@ dependencies = [
"async-std",
"atom_syndication",
"contrast",
"futures",
"hex",
"md-5",
"napi",

View File

@ -25,6 +25,7 @@ rand = "0.8.5"
uuid = { version = "1.7.0", features = ["v4"] }
async-std = "1.12.0"
redis = { version = "0.24.0", features = ["aio", "tokio-comp"] }
futures = "0.3.30"
[build-dependencies]
napi-build = "2"

View File

@ -20,7 +20,7 @@ pub struct FeedItem {
pub pubdate: Option<String>,
pub summary: Option<String>,
pub author: Option<String>,
pub hash_id: Option<String>,
pub hash_id: String,
}
#[derive(Serialize, Debug, Deserialize)]
@ -71,7 +71,8 @@ fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel {
.or(item.link.clone())
.or(item.title.clone())
.and_then(|f| hash_id(f).ok())
.and_then(|f| Some(format!("md5:{}", f))),
.and_then(|f| Some(format!("md5:{}", f)))
.unwrap(), // TODO: Handle error
})
.collect(),
}
@ -118,7 +119,7 @@ fn parse_feed_to_js_result(feed: &Feed) -> JsRssChannel {
.map(|date| date.to_rfc2822()),
summary: item.summary().map(|v| v.value.clone()),
author: authors_to_string(item.authors()),
hash_id: hash_id(item.id.clone()).ok(),
hash_id: hash_id(item.id.clone()).unwrap(),
})
.collect(),
}

View File

@ -1,10 +1,12 @@
use std::collections::{HashMap, HashSet};
use std::future;
use crate::util::QueueWithBackoff;
use std::time::{Duration, Instant};
use napi::bindgen_prelude::{Error as JsError, Status};
use napi::bindgen_prelude::Error as JsError;
use napi::tokio::sync::RwLock;
use std::sync::Arc;
use uuid::Uuid;
use futures::future::{Future, select_all};
use crate::feeds::parser::{js_read_feed, ReadFeedOptions};
use crate::stores::memory::MemoryStorageProvider;
use crate::stores::traits::StorageProvider;
@ -51,7 +53,6 @@ pub struct FeedReader {
queue: QueueWithBackoff,
feeds_to_retain: HashSet<String>,
cache_times: Arc<RwLock<HashMap<String, CacheTime>>>,
storage_provider: Box<impl StorageProvider>,
poll_interval_seconds: f64,
poll_concurrency: u8,
poll_timeout_seconds: i64,
@ -70,14 +71,12 @@ impl FeedReader {
pub fn new(poll_interval_seconds: f64, poll_concurrency: u8, poll_timeout_seconds: i64) -> Self {
let mut cache_times: HashMap<String, CacheTime> = HashMap::new();
let mut lock = Arc::new(RwLock::new(cache_times));
let mut storage_provider = MemoryStorageProvider::new();
FeedReader {
queue: QueueWithBackoff::new(
BACKOFF_TIME_MS,
BACKOFF_POW,
BACKOFF_TIME_MAX_MS,
),
storage_provider,
feeds_to_retain: HashSet::new(),
poll_interval_seconds,
poll_concurrency,
@ -105,7 +104,7 @@ impl FeedReader {
}
async fn poll_feed(&self, url: &String, cache_times: Arc<RwLock<HashMap<String, CacheTime>>>) -> Result<Option<HookshotFeedInfo>, JsError> {
async fn poll_feed(&self, url: &String, cache_times: Arc<RwLock<HashMap<String, CacheTime>>>, mut storage: &impl StorageProvider) -> Result<Option<HookshotFeedInfo>, JsError> {
let seen_entries_changed = false;
let fetch_key = Uuid::new_v4().to_string();
@ -128,13 +127,17 @@ impl FeedReader {
});
drop(c_t_w);
let initial_sync = false; // TODO: Implement
let seen_items: HashSet<String> = HashSet::new(); // TODO: Implement
let initial_sync = storage.has_seen_feed(url).await;
let mut new_guids: Vec<String> = Vec::new();
let new_entries: Vec<HookshotFeedEntry> = Vec::new();
if let Some(feed) = result.feed {
let items = feed.items.iter().map(|x| x.hash_id.clone()).collect::<Vec<_>>();
let seen_items = storage.has_seen_feed_guids(
url,
&items,
).await;
println!("Got feed result!");
let mut feed_info = HookshotFeedInfo {
title: feed.title,
@ -144,26 +147,25 @@ impl FeedReader {
};
for item in feed.items {
println!("Got feed result! {:?}", item);
if let Some(hash_id) = item.hash_id {
if seen_items.contains(&hash_id) {
continue;
}
// TODO: Drop unwrap
new_guids.push(hash_id);
if initial_sync {
// Skip.
continue;
}
feed_info.entries.push(HookshotFeedEntry {
title: item.title,
pubdate: item.pubdate,
summary: item.summary,
author: item.author,
link: item.link,
});
if seen_items.contains(&item.hash_id) {
continue;
}
new_guids.push(item.hash_id.clone());
if initial_sync {
// Skip.
continue;
}
feed_info.entries.push(HookshotFeedEntry {
title: item.title,
pubdate: item.pubdate,
summary: item.summary,
author: item.author,
link: item.link,
});
}
storage.store_feed_guids(&url, &new_guids).await;
return Ok(Some(feed_info));
} else {
// TODO: Implement
@ -178,20 +180,33 @@ impl FeedReader {
return (self.poll_interval_seconds * 1000.0) / self.queue.length() as f64;
}
async unsafe fn poll_feed_int(&mut self, url: &String, mut storage_provider: &impl StorageProvider) {
let mut sleep_for = self.sleeping_interval();
self.feeds_to_retain.insert(url.clone());
let now = Instant::now();
let result = self.poll_feed(url, self.cache_times.clone(), &storage_provider).await;
self.feeds_to_retain.remove(url);
let elapsed = now.elapsed();
sleep_for = (sleep_for - (elapsed.as_millis() as f64)).max(0.0);
async_std::task::sleep(Duration::from_millis(sleep_for as u64)).await;
}
#[napi]
pub async unsafe fn poll_feeds(&mut self) -> Result<(), JsError> {
let mut sleep_for = self.sleeping_interval();
if let Some(url) = self.queue.pop() {
self.feeds_to_retain.insert(url.clone());
let now = Instant::now();
let result = self.poll_feed(&url, self.cache_times.clone()).await?;
self.feeds_to_retain.remove(&url);
let elapsed = now.elapsed();
sleep_for = (sleep_for - (elapsed.as_millis() as f64)).max(0.0);
} else {
println!("No feeds available");
let concurrency = self.poll_concurrency as usize;
let mut storage_provider = MemoryStorageProvider::new();
let mut future_set: Vec<_> = Vec::new();
loop {
if let Some(url) = self.queue.pop() {
let result = Box::pin(self.poll_feed_int(&url, &storage_provider));
future_set.push(result);
} else {
async_std::task::sleep(Duration::from_millis(self.sleeping_interval() as u64)).await;
}
if future_set.len() >= concurrency {
let (item_resolved, ready_future_index, _remaining_futures) =
select_all(future_set).await;
}
}
async_std::task::sleep(Duration::from_millis(sleep_for as u64)).await;
Ok(())
}
}

View File

@ -14,23 +14,19 @@ impl MemoryStorageProvider {
}
impl StorageProvider for MemoryStorageProvider {
async fn store_feed_guids(&mut self, url: &String, guids: &Vec<String>) -> Result<(), Err<String>> {
let mut guid_set = self.guids.get(url).or_else(|| {
let new = HashSet::new();
self.guids.insert(url.clone(), new);
Some(&new)
}).unwrap();
async fn store_feed_guids(&mut self, url: &String, guids: &Vec<String>) {
let guid_set = self.guids.entry(url.clone()).or_insert(HashSet::default());
for guid in guids {
guid_set.insert(guid.clone());
}
Ok(())
}
async fn has_seen_feed(&self, url: &String, guids: &Vec<String>) -> Result<bool, Err<String>> {
Ok(self.guids.contains_key(url))
async fn has_seen_feed(&self, url: &String) -> bool {
self.guids.contains_key(url)
}
async fn has_seen_feed_guids(&self,url: &String, guids: &Vec<String>) -> Result<Vec<String>, Err<String>> {
async fn has_seen_feed_guids(&self,url: &String, guids: &Vec<String>) -> Vec<String> {
let mut seen_guids = Vec::default();
if let Some(existing_guids) = self.guids.get(url) {
for guid in guids {
@ -39,6 +35,6 @@ impl StorageProvider for MemoryStorageProvider {
}
}
}
Ok(seen_guids)
seen_guids
}
}

View File

@ -1,5 +1,5 @@
trait StorageProvider {
async fn store_feed_guids(&mut self, url: &String, guids: &Vec<String>) -> Result<Ok, Err<String>>;
async fn has_seen_feed(&self, url: &String, guids: &Vec<String>) -> Result<bool, Err<String>>;
async fn has_seen_feed_guids(&self, url: &String, guids: &Vec<String>) -> Result<Vec<String>, Err<String>>;
pub trait StorageProvider {
async fn store_feed_guids(&mut self, url: &String, guids: &Vec<String>);
async fn has_seen_feed(&self, url: &String) -> bool;
async fn has_seen_feed_guids(&self, url: &String, guids: &Vec<String>) -> Vec<String>;
}