mirror of
https://github.com/matrix-org/matrix-hookshot.git
synced 2025-03-10 13:17:08 +00:00
Enable feeds to be polled concurrently (#779)
* Add support for concurrent RSS feed reading * Add feed concurrency polling * Actually load all workers * Ensure feed just sleeps if there are no urls to poll * Fix map * Update default config * Update src/feeds/FeedReader.ts Co-authored-by: Andrew Ferrazzutti <andrewf@element.io> * Update src/feeds/FeedReader.ts Co-authored-by: Andrew Ferrazzutti <andrewf@element.io> --------- Co-authored-by: Andrew Ferrazzutti <andrewf@element.io>
This commit is contained in:
parent
981fdb6631
commit
172ae30a73
1
changelog.d/779.feature
Normal file
1
changelog.d/779.feature
Normal file
@ -0,0 +1 @@
|
||||
Feeds are now polled concurrently (defaulting to 4 feeds at a time).
|
@ -86,6 +86,7 @@ feeds:
|
||||
# (Optional) Configure this to enable RSS/Atom feed support
|
||||
|
||||
enabled: false
|
||||
pollConcurrency: 4
|
||||
pollIntervalSeconds: 600
|
||||
pollTimeoutSeconds: 30
|
||||
provisioning:
|
||||
|
@ -238,7 +238,8 @@ export class BridgeConfigGitLab {
|
||||
|
||||
export interface BridgeConfigFeedsYAML {
|
||||
enabled: boolean;
|
||||
pollIntervalSeconds: number;
|
||||
pollIntervalSeconds?: number;
|
||||
pollConcurrency?: number;
|
||||
pollTimeoutSeconds?: number;
|
||||
}
|
||||
|
||||
@ -246,10 +247,12 @@ export class BridgeConfigFeeds {
|
||||
public enabled: boolean;
|
||||
public pollIntervalSeconds: number;
|
||||
public pollTimeoutSeconds: number;
|
||||
public pollConcurrency: number;
|
||||
|
||||
constructor(yaml: BridgeConfigFeedsYAML) {
|
||||
this.enabled = yaml.enabled;
|
||||
this.pollIntervalSeconds = yaml.pollIntervalSeconds;
|
||||
this.pollConcurrency = yaml.pollConcurrency ?? 4;
|
||||
this.pollIntervalSeconds = yaml.pollIntervalSeconds ?? 600;
|
||||
assert.strictEqual(typeof this.pollIntervalSeconds, "number");
|
||||
this.pollTimeoutSeconds = yaml.pollTimeoutSeconds ?? 30;
|
||||
assert.strictEqual(typeof this.pollTimeoutSeconds, "number");
|
||||
|
@ -122,6 +122,8 @@ export const DefaultConfigRoot: BridgeConfigRoot = {
|
||||
feeds: {
|
||||
enabled: false,
|
||||
pollIntervalSeconds: 600,
|
||||
pollTimeoutSeconds: 30,
|
||||
pollConcurrency: 4,
|
||||
},
|
||||
provisioning: {
|
||||
secret: "!secretToken"
|
||||
|
@ -108,7 +108,6 @@ function shuffle<T>(array: T[]): T[] {
|
||||
return array;
|
||||
}
|
||||
|
||||
|
||||
export class FeedReader {
|
||||
/**
|
||||
* Read a feed URL and parse it into a set of items.
|
||||
@ -189,7 +188,9 @@ export class FeedReader {
|
||||
log.debug('Loaded feed URLs:', this.observedFeedUrls);
|
||||
|
||||
void this.loadSeenEntries().then(() => {
|
||||
return this.pollFeeds();
|
||||
for (let i = 0; i < config.pollConcurrency; i++) {
|
||||
void this.pollFeeds(i);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@ -365,40 +366,44 @@ export class FeedReader {
|
||||
/**
|
||||
* Start polling all the feeds.
|
||||
*/
|
||||
public async pollFeeds(): Promise<void> {
|
||||
log.debug(`Checking for updates in ${this.observedFeedUrls.size} RSS/Atom feeds`);
|
||||
|
||||
const fetchingStarted = Date.now();
|
||||
|
||||
const [ url ] = this.feedQueue.splice(0, 1);
|
||||
|
||||
if (url) {
|
||||
if (await this.pollFeed(url)) {
|
||||
await this.saveSeenEntries();
|
||||
}
|
||||
}
|
||||
public async pollFeeds(workerId: number): Promise<void> {
|
||||
|
||||
// Update on each iteration
|
||||
Metrics.feedsFailing.set({ reason: "http" }, this.feedsFailingHttp.size );
|
||||
Metrics.feedsFailing.set({ reason: "parsing" }, this.feedsFailingParsing.size);
|
||||
Metrics.feedsFailingDeprecated.set({ reason: "http" }, this.feedsFailingHttp.size );
|
||||
Metrics.feedsFailingDeprecated.set({ reason: "parsing" }, this.feedsFailingParsing.size);
|
||||
|
||||
const elapsed = Date.now() - fetchingStarted;
|
||||
Metrics.feedFetchMs.set(elapsed);
|
||||
Metrics.feedsFetchMsDeprecated.set(elapsed);
|
||||
log.debug(`Checking for updates in ${this.observedFeedUrls.size} RSS/Atom feeds (worker: ${workerId})`);
|
||||
|
||||
const sleepFor = Math.max(this.sleepingInterval - elapsed, 0);
|
||||
log.debug(`Feed fetching took ${elapsed / 1000}s, sleeping for ${sleepFor / 1000}s`);
|
||||
const fetchingStarted = Date.now();
|
||||
|
||||
if (elapsed > this.sleepingInterval) {
|
||||
log.warn(`It took us longer to update the feeds than the configured pool interval`);
|
||||
const [ url ] = this.feedQueue.splice(0, 1);
|
||||
let sleepFor = this.sleepingInterval;
|
||||
|
||||
if (url) {
|
||||
if (await this.pollFeed(url)) {
|
||||
await this.saveSeenEntries();
|
||||
}
|
||||
const elapsed = Date.now() - fetchingStarted;
|
||||
Metrics.feedFetchMs.set(elapsed);
|
||||
Metrics.feedsFetchMsDeprecated.set(elapsed);
|
||||
sleepFor = Math.max(this.sleepingInterval - elapsed, 0);
|
||||
log.debug(`Feed fetching took ${elapsed / 1000}s, sleeping for ${sleepFor / 1000}s`);
|
||||
|
||||
if (elapsed > this.sleepingInterval) {
|
||||
log.warn(`It took us longer to update the feeds than the configured pool interval`);
|
||||
}
|
||||
} else {
|
||||
// It may be possible that we have more workers than feeds. This will cause the worker to just sleep.
|
||||
log.debug(`No feeds available to poll for worker ${workerId}`);
|
||||
}
|
||||
|
||||
this.timeout = setTimeout(() => {
|
||||
if (!this.shouldRun) {
|
||||
return;
|
||||
}
|
||||
void this.pollFeeds();
|
||||
void this.pollFeeds(workerId);
|
||||
}, sleepFor);
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user