diff --git a/changelog.d/779.feature b/changelog.d/779.feature new file mode 100644 index 00000000..e60bfc42 --- /dev/null +++ b/changelog.d/779.feature @@ -0,0 +1 @@ +Feeds are now polled concurrently (defaulting to 4 feeds at a time). diff --git a/config.sample.yml b/config.sample.yml index 3f3f4a37..846494de 100644 --- a/config.sample.yml +++ b/config.sample.yml @@ -86,6 +86,7 @@ feeds: # (Optional) Configure this to enable RSS/Atom feed support enabled: false + pollConcurrency: 4 pollIntervalSeconds: 600 pollTimeoutSeconds: 30 provisioning: diff --git a/src/config/Config.ts b/src/config/Config.ts index bd106aff..bee5942d 100644 --- a/src/config/Config.ts +++ b/src/config/Config.ts @@ -238,7 +238,8 @@ export class BridgeConfigGitLab { export interface BridgeConfigFeedsYAML { enabled: boolean; - pollIntervalSeconds: number; + pollIntervalSeconds?: number; + pollConcurrency?: number; pollTimeoutSeconds?: number; } @@ -246,10 +247,12 @@ export class BridgeConfigFeeds { public enabled: boolean; public pollIntervalSeconds: number; public pollTimeoutSeconds: number; + public pollConcurrency: number; constructor(yaml: BridgeConfigFeedsYAML) { this.enabled = yaml.enabled; - this.pollIntervalSeconds = yaml.pollIntervalSeconds; + this.pollConcurrency = yaml.pollConcurrency ?? 4; + this.pollIntervalSeconds = yaml.pollIntervalSeconds ?? 600; assert.strictEqual(typeof this.pollIntervalSeconds, "number"); this.pollTimeoutSeconds = yaml.pollTimeoutSeconds ?? 30; assert.strictEqual(typeof this.pollTimeoutSeconds, "number"); diff --git a/src/config/Defaults.ts b/src/config/Defaults.ts index cb1f4692..0d12884b 100644 --- a/src/config/Defaults.ts +++ b/src/config/Defaults.ts @@ -122,6 +122,8 @@ export const DefaultConfigRoot: BridgeConfigRoot = { feeds: { enabled: false, pollIntervalSeconds: 600, + pollTimeoutSeconds: 30, + pollConcurrency: 4, }, provisioning: { secret: "!secretToken" diff --git a/src/feeds/FeedReader.ts b/src/feeds/FeedReader.ts index c0d3d4d2..3c07cc77 100644 --- a/src/feeds/FeedReader.ts +++ b/src/feeds/FeedReader.ts @@ -108,7 +108,6 @@ function shuffle(array: T[]): T[] { return array; } - export class FeedReader { /** * Read a feed URL and parse it into a set of items. @@ -189,7 +188,9 @@ export class FeedReader { log.debug('Loaded feed URLs:', this.observedFeedUrls); void this.loadSeenEntries().then(() => { - return this.pollFeeds(); + for (let i = 0; i < config.pollConcurrency; i++) { + void this.pollFeeds(i); + } }); } @@ -365,40 +366,44 @@ export class FeedReader { /** * Start polling all the feeds. */ - public async pollFeeds(): Promise { - log.debug(`Checking for updates in ${this.observedFeedUrls.size} RSS/Atom feeds`); - - const fetchingStarted = Date.now(); - - const [ url ] = this.feedQueue.splice(0, 1); - - if (url) { - if (await this.pollFeed(url)) { - await this.saveSeenEntries(); - } - } + public async pollFeeds(workerId: number): Promise { + // Update on each iteration Metrics.feedsFailing.set({ reason: "http" }, this.feedsFailingHttp.size ); Metrics.feedsFailing.set({ reason: "parsing" }, this.feedsFailingParsing.size); Metrics.feedsFailingDeprecated.set({ reason: "http" }, this.feedsFailingHttp.size ); Metrics.feedsFailingDeprecated.set({ reason: "parsing" }, this.feedsFailingParsing.size); - const elapsed = Date.now() - fetchingStarted; - Metrics.feedFetchMs.set(elapsed); - Metrics.feedsFetchMsDeprecated.set(elapsed); + log.debug(`Checking for updates in ${this.observedFeedUrls.size} RSS/Atom feeds (worker: ${workerId})`); - const sleepFor = Math.max(this.sleepingInterval - elapsed, 0); - log.debug(`Feed fetching took ${elapsed / 1000}s, sleeping for ${sleepFor / 1000}s`); + const fetchingStarted = Date.now(); - if (elapsed > this.sleepingInterval) { - log.warn(`It took us longer to update the feeds than the configured pool interval`); + const [ url ] = this.feedQueue.splice(0, 1); + let sleepFor = this.sleepingInterval; + + if (url) { + if (await this.pollFeed(url)) { + await this.saveSeenEntries(); + } + const elapsed = Date.now() - fetchingStarted; + Metrics.feedFetchMs.set(elapsed); + Metrics.feedsFetchMsDeprecated.set(elapsed); + sleepFor = Math.max(this.sleepingInterval - elapsed, 0); + log.debug(`Feed fetching took ${elapsed / 1000}s, sleeping for ${sleepFor / 1000}s`); + + if (elapsed > this.sleepingInterval) { + log.warn(`It took us longer to update the feeds than the configured pool interval`); + } + } else { + // It may be possible that we have more workers than feeds. This will cause the worker to just sleep. + log.debug(`No feeds available to poll for worker ${workerId}`); } this.timeout = setTimeout(() => { if (!this.shouldRun) { return; } - void this.pollFeeds(); + void this.pollFeeds(workerId); }, sleepFor); }