mirror of
https://github.com/matrix-org/matrix-hookshot.git
synced 2025-03-10 13:17:08 +00:00
Experimental new bucket-style queues.
This commit is contained in:
parent
b8d33f42bb
commit
200367ff85
@ -14,6 +14,8 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider
|
|||||||
private storedFiles = new QuickLRU<string, string>({ maxSize: 128 });
|
private storedFiles = new QuickLRU<string, string>({ maxSize: 128 });
|
||||||
private gitlabDiscussionThreads = new Map<string, SerializedGitlabDiscussionThreads>();
|
private gitlabDiscussionThreads = new Map<string, SerializedGitlabDiscussionThreads>();
|
||||||
private feedGuids = new Map<string, Array<string>>();
|
private feedGuids = new Map<string, Array<string>>();
|
||||||
|
private feedQueries: {[url: string]: number} = {};
|
||||||
|
private feedQueriesResetTime = 0;
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
super();
|
super();
|
||||||
@ -29,6 +31,7 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider
|
|||||||
while (set.length > MAX_FEED_ITEMS) {
|
while (set.length > MAX_FEED_ITEMS) {
|
||||||
set.pop();
|
set.pop();
|
||||||
}
|
}
|
||||||
|
this.feedQueries[url] = (this.feedQueries[url] ?? 0) + 1
|
||||||
}
|
}
|
||||||
|
|
||||||
async hasSeenFeed(url: string): Promise<boolean> {
|
async hasSeenFeed(url: string): Promise<boolean> {
|
||||||
@ -108,4 +111,14 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider
|
|||||||
public async setGitlabDiscussionThreads(connectionId: string, value: SerializedGitlabDiscussionThreads): Promise<void> {
|
public async setGitlabDiscussionThreads(connectionId: string, value: SerializedGitlabDiscussionThreads): Promise<void> {
|
||||||
this.gitlabDiscussionThreads.set(connectionId, value);
|
this.gitlabDiscussionThreads.set(connectionId, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async resetFeedQueryCount() {
|
||||||
|
const scores = {...this.feedQueries};
|
||||||
|
this.feedQueriesResetTime = Date.now();
|
||||||
|
this.feedQueries = {};
|
||||||
|
return {
|
||||||
|
scores,
|
||||||
|
lastQueryTime: this.feedQueriesResetTime,
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,8 @@ const WIDGET_TOKENS = "widgets.tokens.";
|
|||||||
const WIDGET_USER_TOKENS = "widgets.user-tokens.";
|
const WIDGET_USER_TOKENS = "widgets.user-tokens.";
|
||||||
|
|
||||||
const FEED_GUIDS = "feeds.guids.";
|
const FEED_GUIDS = "feeds.guids.";
|
||||||
|
const FEED_QUERY_SCORE = "feeds.queryscore";
|
||||||
|
const FEED_QUERY_SCORE_RESET_TIME = "feeds.queryscore.resettime";
|
||||||
|
|
||||||
const log = new Logger("RedisASProvider");
|
const log = new Logger("RedisASProvider");
|
||||||
|
|
||||||
@ -219,6 +221,7 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme
|
|||||||
const feedKey = `${FEED_GUIDS}${url}`;
|
const feedKey = `${FEED_GUIDS}${url}`;
|
||||||
await this.redis.lpush(feedKey, ...guids);
|
await this.redis.lpush(feedKey, ...guids);
|
||||||
await this.redis.ltrim(feedKey, 0, MAX_FEED_ITEMS);
|
await this.redis.ltrim(feedKey, 0, MAX_FEED_ITEMS);
|
||||||
|
await this.redis.zadd(FEED_QUERY_SCORE, "INCR", 1, url);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async hasSeenFeed(url: string): Promise<boolean> {
|
public async hasSeenFeed(url: string): Promise<boolean> {
|
||||||
@ -239,4 +242,21 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme
|
|||||||
}
|
}
|
||||||
return guids.filter((_guid, index) => res[index][1] !== null);
|
return guids.filter((_guid, index) => res[index][1] !== null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async resetFeedQueryCount() {
|
||||||
|
const scores: {[key: string]: number} = {};
|
||||||
|
const data = await this.redis.zrangebyscore(FEED_QUERY_SCORE, "-inf", "+inf", "WITHSCORES");
|
||||||
|
for (let index = 0; index < data.length; index += 2) {
|
||||||
|
const key = data[index];
|
||||||
|
const value = parseInt(data[index+1]);
|
||||||
|
scores[key] = value;
|
||||||
|
}
|
||||||
|
const lastQueryTime = parseInt((await this.redis.get(FEED_QUERY_SCORE_RESET_TIME)) ?? "0");
|
||||||
|
await this.redis.del(FEED_QUERY_SCORE);
|
||||||
|
await this.redis.set(FEED_QUERY_SCORE_RESET_TIME, Date.now());
|
||||||
|
return {
|
||||||
|
scores,
|
||||||
|
lastQueryTime,
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -28,4 +28,8 @@ export interface IBridgeStorageProvider extends IAppserviceStorageProvider, ISto
|
|||||||
storeFeedGuids(url: string, ...guids: string[]): Promise<void>;
|
storeFeedGuids(url: string, ...guids: string[]): Promise<void>;
|
||||||
hasSeenFeed(url: string): Promise<boolean>;
|
hasSeenFeed(url: string): Promise<boolean>;
|
||||||
hasSeenFeedGuids(url: string, ...guids: string[]): Promise<string[]>;
|
hasSeenFeedGuids(url: string, ...guids: string[]): Promise<string[]>;
|
||||||
|
resetFeedQueryCount(): Promise<{
|
||||||
|
scores: Record<string, number>,
|
||||||
|
lastQueryTime: number
|
||||||
|
}>;
|
||||||
}
|
}
|
@ -83,14 +83,34 @@ export class FeedReader {
|
|||||||
|
|
||||||
private connections: FeedConnection[];
|
private connections: FeedConnection[];
|
||||||
|
|
||||||
private feedQueue = new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS);
|
/**
|
||||||
|
* Poll [1] every time.
|
||||||
|
* Poll [2] every 4th time.
|
||||||
|
* Poll [3] every 8th time.
|
||||||
|
* Poll [4] every 16th time.
|
||||||
|
*
|
||||||
|
* Every N hours, move between buckets.
|
||||||
|
*/
|
||||||
|
|
||||||
|
private readonly intakeBucket = new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS);
|
||||||
|
|
||||||
|
// TODO: Make this configurable.
|
||||||
|
private readonly feedBuckets = new Map([
|
||||||
|
[0, this.intakeBucket], // < feedInterval
|
||||||
|
[4, new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS)], // < 4xfeedInvterval
|
||||||
|
[8, new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS)], // < 8xfeedInvterval
|
||||||
|
[16, new QueueWithBackoff(BACKOFF_TIME_MS, BACKOFF_POW, BACKOFF_TIME_MAX_MS)], // rest
|
||||||
|
]);
|
||||||
|
private readonly lastBucket = 16;
|
||||||
|
private bucketIteration = 0;
|
||||||
|
|
||||||
|
|
||||||
// A set of last modified times for each url.
|
// A set of last modified times for each url.
|
||||||
private cacheTimes: Map<string, { etag?: string, lastModified?: string}> = new Map();
|
private readonly cacheTimes: Map<string, { etag?: string, lastModified?: string}> = new Map();
|
||||||
|
|
||||||
// Reason failures to url map.
|
// Reason failures to url map.
|
||||||
private feedsFailingHttp = new Set();
|
private readonly feedsFailingHttp = new Set();
|
||||||
private feedsFailingParsing = new Set();
|
private readonly feedsFailingParsing = new Set();
|
||||||
|
|
||||||
static readonly seenEntriesEventType = "uk.half-shot.matrix-hookshot.feed.reader.seenEntries";
|
static readonly seenEntriesEventType = "uk.half-shot.matrix-hookshot.feed.reader.seenEntries";
|
||||||
|
|
||||||
@ -98,10 +118,14 @@ export class FeedReader {
|
|||||||
private readonly timeouts: (NodeJS.Timeout|undefined)[];
|
private readonly timeouts: (NodeJS.Timeout|undefined)[];
|
||||||
private readonly feedsToRetain = new Set();
|
private readonly feedsToRetain = new Set();
|
||||||
|
|
||||||
|
get feedCount() {
|
||||||
|
return Object.values(this.feedBuckets).map(b => b.length()).reduce((a,b) => a+b, 0);
|
||||||
|
}
|
||||||
|
|
||||||
get sleepingInterval() {
|
get sleepingInterval() {
|
||||||
return (
|
return (
|
||||||
// Calculate the number of MS to wait in between feeds.
|
// Calculate the number of MS to wait in between feeds.
|
||||||
(this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length() || 1)
|
(this.config.pollIntervalSeconds * 1000) / (this.feedCount || 1)
|
||||||
// And multiply by the number of concurrent readers
|
// And multiply by the number of concurrent readers
|
||||||
) * this.config.pollConcurrency;
|
) * this.config.pollConcurrency;
|
||||||
}
|
}
|
||||||
@ -125,7 +149,7 @@ export class FeedReader {
|
|||||||
const normalisedUrl = normalizeUrl(newConnection.feedUrl);
|
const normalisedUrl = normalizeUrl(newConnection.feedUrl);
|
||||||
if (!feeds.has(normalisedUrl)) {
|
if (!feeds.has(normalisedUrl)) {
|
||||||
log.info(`Connection added, adding "${normalisedUrl}" to queue`);
|
log.info(`Connection added, adding "${normalisedUrl}" to queue`);
|
||||||
this.feedQueue.push(normalisedUrl);
|
this.intakeBucket.push(normalisedUrl);
|
||||||
feeds.add(normalisedUrl);
|
feeds.add(normalisedUrl);
|
||||||
Metrics.feedsCount.inc();
|
Metrics.feedsCount.inc();
|
||||||
Metrics.feedsCountDeprecated.inc();
|
Metrics.feedsCountDeprecated.inc();
|
||||||
@ -151,7 +175,7 @@ export class FeedReader {
|
|||||||
}
|
}
|
||||||
log.info(`Connection removed, removing "${normalisedUrl}" from queue`);
|
log.info(`Connection removed, removing "${normalisedUrl}" from queue`);
|
||||||
this.feedsToRetain.delete(normalisedUrl);
|
this.feedsToRetain.delete(normalisedUrl);
|
||||||
this.feedQueue.remove(normalisedUrl);
|
Object.values(this.feedBuckets).some(bucket => bucket.remove(normalisedUrl));
|
||||||
feeds.delete(normalisedUrl);
|
feeds.delete(normalisedUrl);
|
||||||
this.feedsFailingHttp.delete(normalisedUrl);
|
this.feedsFailingHttp.delete(normalisedUrl);
|
||||||
this.feedsFailingParsing.delete(normalisedUrl);
|
this.feedsFailingParsing.delete(normalisedUrl);
|
||||||
@ -164,6 +188,61 @@ export class FeedReader {
|
|||||||
for (let i = 0; i < config.pollConcurrency; i++) {
|
for (let i = 0; i < config.pollConcurrency; i++) {
|
||||||
void this.pollFeeds(i);
|
void this.pollFeeds(i);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.configureBucketInterval();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resort feeds into the correct queues based on how
|
||||||
|
* often they return new values.
|
||||||
|
*/
|
||||||
|
public configureBucketInterval() {
|
||||||
|
// TOOD: Run this function on startup to immediate sort into buckets.
|
||||||
|
const bucketEntries = [...this.feedBuckets.entries()];
|
||||||
|
setInterval(async () => {
|
||||||
|
const { scores, lastQueryTime } = await this.storage.resetFeedQueryCount();
|
||||||
|
if (lastQueryTime === 0) {
|
||||||
|
log.debug(`Skipping first bucket interval check, not enough data present.`);
|
||||||
|
// Skip, not enough data.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const timePassed = Date.now() - lastQueryTime;
|
||||||
|
// Determine a reasonable approximation of how much the feed may have been polled in the
|
||||||
|
// last time period.
|
||||||
|
const maximumFastestPotential = timePassed / this.sleepingInterval;
|
||||||
|
// TODO: This should be all feeds, so we can catch any that don't even score...
|
||||||
|
for (const [url, pollCount] of Object.entries(scores)) {
|
||||||
|
// Remove current bucket
|
||||||
|
const feedCurrentBucket = bucketEntries.find((b) => b[1].remove(url));
|
||||||
|
if (!feedCurrentBucket) {
|
||||||
|
log.warn(`Feed ${url} was not found in any buckets!!!`);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// E.g. Bucket 1 polls only 4 times as often as bucket 0, so we want to ensure we weight these fairly.
|
||||||
|
const maximumPotential = maximumFastestPotential * (1 / (feedCurrentBucket[0] || 1));
|
||||||
|
log.debug(`Determining new bucket for ${url}. Maximum possible poll rate would be ${maximumPotential}. Score was ${pollCount}`);
|
||||||
|
|
||||||
|
// Determine new bucket
|
||||||
|
for (let bIndex = 0; bIndex < bucketEntries.length; bIndex++) {
|
||||||
|
const currentBucket = bucketEntries[bIndex];
|
||||||
|
const nextBucket = bucketEntries[bIndex+1];
|
||||||
|
if (!nextBucket) {
|
||||||
|
// We're at the end, just push to the last bucket because this is a sad feed that didn't
|
||||||
|
// get any items.
|
||||||
|
currentBucket[1].push(url);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// ((16 - (4 / 16)) / 16)*50
|
||||||
|
const nextBucketMinSuccessRate = (1 - (nextBucket[0] / this.lastBucket))*maximumPotential;
|
||||||
|
log.debug(`Min success rate target for bucket ${nextBucket[0]} is ${nextBucketMinSuccessRate}`);
|
||||||
|
if (pollCount >= nextBucketMinSuccessRate) {
|
||||||
|
log.debug(`Adding feed ${url} to ${currentBucket[0]}`);
|
||||||
|
currentBucket[1].push(url);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, 30000);
|
||||||
}
|
}
|
||||||
|
|
||||||
public stop() {
|
public stop() {
|
||||||
@ -185,7 +264,7 @@ export class FeedReader {
|
|||||||
log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`);
|
log.error(`Invalid feedUrl for connection ${conn.connectionId}: ${conn.feedUrl}. It will not be tracked`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.feedQueue.populate([...observedFeedUrls]);
|
this.intakeBucket.populate([...observedFeedUrls]);
|
||||||
Metrics.feedsCount.set(observedFeedUrls.size);
|
Metrics.feedsCount.set(observedFeedUrls.size);
|
||||||
Metrics.feedsCountDeprecated.set(observedFeedUrls.size);
|
Metrics.feedsCountDeprecated.set(observedFeedUrls.size);
|
||||||
return observedFeedUrls;
|
return observedFeedUrls;
|
||||||
@ -199,7 +278,7 @@ export class FeedReader {
|
|||||||
* @param url The URL to be polled.
|
* @param url The URL to be polled.
|
||||||
* @returns A boolean that returns if we saw any changes on the feed since the last poll time.
|
* @returns A boolean that returns if we saw any changes on the feed since the last poll time.
|
||||||
*/
|
*/
|
||||||
public async pollFeed(url: string): Promise<boolean> {
|
public async pollFeed(url: string, feedQueue: QueueWithBackoff): Promise<boolean> {
|
||||||
// If a feed is deleted while it is being polled, we need
|
// If a feed is deleted while it is being polled, we need
|
||||||
// to remember NOT to add it back to the queue. This
|
// to remember NOT to add it back to the queue. This
|
||||||
// set keeps track of all the feeds that *should* be
|
// set keeps track of all the feeds that *should* be
|
||||||
@ -236,6 +315,7 @@ export class FeedReader {
|
|||||||
// If undefined, we got a not-modified.
|
// If undefined, we got a not-modified.
|
||||||
log.debug(`Found ${feed.items.length} entries in ${url}`);
|
log.debug(`Found ${feed.items.length} entries in ${url}`);
|
||||||
const seenItems = await this.storage.hasSeenFeedGuids(url, ...feed.items.filter(item => !!item.hashId).map(item => item.hashId!))
|
const seenItems = await this.storage.hasSeenFeedGuids(url, ...feed.items.filter(item => !!item.hashId).map(item => item.hashId!))
|
||||||
|
log.debug(`Seen ${seenItems.join(', ')} entries for ${url} already`);
|
||||||
for (const item of feed.items) {
|
for (const item of feed.items) {
|
||||||
// Some feeds have a nasty habit of leading a empty tag there, making us parse it as garbage.
|
// Some feeds have a nasty habit of leading a empty tag there, making us parse it as garbage.
|
||||||
if (!item.hashId) {
|
if (!item.hashId) {
|
||||||
@ -280,7 +360,7 @@ export class FeedReader {
|
|||||||
this.feedsFailingParsing.delete(url);
|
this.feedsFailingParsing.delete(url);
|
||||||
if (this.feedsToRetain.has(url)) {
|
if (this.feedsToRetain.has(url)) {
|
||||||
// If we've removed this feed since processing it, do not requeue.
|
// If we've removed this feed since processing it, do not requeue.
|
||||||
this.feedQueue.push(url);
|
feedQueue.push(url);
|
||||||
}
|
}
|
||||||
} catch (err: unknown) {
|
} catch (err: unknown) {
|
||||||
// TODO: Proper Rust Type error.
|
// TODO: Proper Rust Type error.
|
||||||
@ -289,7 +369,7 @@ export class FeedReader {
|
|||||||
} else {
|
} else {
|
||||||
this.feedsFailingParsing.add(url);
|
this.feedsFailingParsing.add(url);
|
||||||
}
|
}
|
||||||
const backoffDuration = this.feedQueue.backoff(url);
|
const backoffDuration = feedQueue.backoff(url);
|
||||||
const error = err instanceof Error ? err : new Error(`Unknown error ${err}`);
|
const error = err instanceof Error ? err : new Error(`Unknown error ${err}`);
|
||||||
const feedError = new FeedError(url.toString(), error, fetchKey);
|
const feedError = new FeedError(url.toString(), error, fetchKey);
|
||||||
log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`);
|
log.error("Unable to read feed:", feedError.message, `backing off for ${backoffDuration}ms`);
|
||||||
@ -309,29 +389,44 @@ export class FeedReader {
|
|||||||
Metrics.feedsFailingDeprecated.set({ reason: "http" }, this.feedsFailingHttp.size );
|
Metrics.feedsFailingDeprecated.set({ reason: "http" }, this.feedsFailingHttp.size );
|
||||||
Metrics.feedsFailingDeprecated.set({ reason: "parsing" }, this.feedsFailingParsing.size);
|
Metrics.feedsFailingDeprecated.set({ reason: "parsing" }, this.feedsFailingParsing.size);
|
||||||
|
|
||||||
log.debug(`Checking for updates in ${this.feedQueue.length()} RSS/Atom feeds (worker: ${workerId})`);
|
log.debug(`Checking for updates in ${this.feedCount} RSS/Atom feeds (worker: ${workerId})`);
|
||||||
|
|
||||||
|
// TODO: Dehardcode
|
||||||
|
if (this.bucketIteration > this.lastBucket) {
|
||||||
|
this.bucketIteration = 0;
|
||||||
|
}
|
||||||
|
this.bucketIteration++;
|
||||||
|
const queuesToPoll = [...this.feedBuckets.entries()].filter(([bucketInterval]) => bucketInterval % this.bucketIteration === 0);
|
||||||
|
|
||||||
const fetchingStarted = Date.now();
|
const fetchingStarted = Date.now();
|
||||||
|
|
||||||
const url = this.feedQueue.pop();
|
let sleepFor = 0;
|
||||||
let sleepFor = this.sleepingInterval;
|
for (const [queueNumber, queue] of queuesToPoll) {
|
||||||
|
log.debug(`Fetching from queue ${queueNumber}`);
|
||||||
|
const url = queue.pop();
|
||||||
|
if (url) {
|
||||||
|
if (await this.pollFeed(url, queue)) {
|
||||||
|
log.debug(`Feed changed and will be saved`);
|
||||||
|
}
|
||||||
|
const elapsed = Date.now() - fetchingStarted;
|
||||||
|
log.debug(`Feed fetching took ${elapsed / 1000}s,`);
|
||||||
|
Metrics.feedFetchMs.set(elapsed);
|
||||||
|
Metrics.feedsFetchMsDeprecated.set(elapsed);
|
||||||
|
sleepFor += Math.max(this.sleepingInterval - elapsed, 0);
|
||||||
|
} else {
|
||||||
|
// It is possible that we have more workers than feeds. This will cause the worker to just sleep.
|
||||||
|
log.debug(`No feeds available to poll for worker ${workerId} on this bucket`);
|
||||||
|
// So we don't tightloop
|
||||||
|
if (sleepFor === 0) {
|
||||||
|
sleepFor += this.sleepingInterval;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (url) {
|
if (this.sleepingInterval === 0) {
|
||||||
if (await this.pollFeed(url)) {
|
log.warn(`It took us longer to update the feeds than the configured pool interval`);
|
||||||
log.debug(`Feed changed and will be saved`);
|
|
||||||
}
|
|
||||||
const elapsed = Date.now() - fetchingStarted;
|
|
||||||
Metrics.feedFetchMs.set(elapsed);
|
|
||||||
Metrics.feedsFetchMsDeprecated.set(elapsed);
|
|
||||||
sleepFor = Math.max(this.sleepingInterval - elapsed, 0);
|
|
||||||
log.debug(`Feed fetching took ${elapsed / 1000}s, sleeping for ${sleepFor / 1000}s`);
|
|
||||||
|
|
||||||
if (elapsed > this.sleepingInterval) {
|
|
||||||
log.warn(`It took us longer to update the feeds than the configured pool interval`);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
// It is possible that we have more workers than feeds. This will cause the worker to just sleep.
|
log.debug(`Sleeping for ${sleepFor / 1000}s`);
|
||||||
log.debug(`No feeds available to poll for worker ${workerId}`);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
this.timeouts[workerId] = setTimeout(() => {
|
this.timeouts[workerId] = setTimeout(() => {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user