Performance improvements to Feeds (#786)

* Various smaller changes

* Drop account data entirely

* Use max feed items

* Commit known working improvements

* Better status handlingh

* changelog

* Update changelog

* Add a note on Redis.

* Add proper HTTP tests

* Linty lint

* Tweaks

* New metrics woah

* Tweaks
This commit is contained in:
Will Hunt 2023-06-28 16:29:54 +01:00 committed by GitHub
parent 2173a8ccb9
commit 3217b9eecf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1086 additions and 264 deletions

757
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -7,7 +7,7 @@ edition = "2021"
crate-type = ["cdylib"] crate-type = ["cdylib"]
[dependencies] [dependencies]
napi = {version="2", features=["serde-json"]} napi = {version="2", features=["serde-json", "async"]}
napi-derive = "2" napi-derive = "2"
url = "2" url = "2"
serde_json = "1" serde_json = "1"
@ -20,6 +20,7 @@ hex = "0.4.3"
rss = "2.0.3" rss = "2.0.3"
atom_syndication = "0.12" atom_syndication = "0.12"
ruma = { version = "0.8.2", features = ["events", "unstable-sanitize"] } ruma = { version = "0.8.2", features = ["events", "unstable-sanitize"] }
reqwest = "0.11"
[build-dependencies] [build-dependencies]
napi-build = "1" napi-build = "1"

5
changelog.d/786.bugfix Normal file
View File

@ -0,0 +1,5 @@
Refactor Hookshot to use Redis for caching of feed information, massively improving memory usage.
Please note that this is a behavioural change: Hookshots configured to use in-memory caching (not Redis),
will no longer bridge any RSS entries it may have missed during downtime, and will instead perform an initial
sync (not reporting any entries) instead.

View File

@ -70,6 +70,8 @@ Below is the generated list of Prometheus metrics for Hookshot.
| nodejs_eventloop_lag_p50_seconds | The 50th percentile of the recorded event loop delays. | | | nodejs_eventloop_lag_p50_seconds | The 50th percentile of the recorded event loop delays. | |
| nodejs_eventloop_lag_p90_seconds | The 90th percentile of the recorded event loop delays. | | | nodejs_eventloop_lag_p90_seconds | The 90th percentile of the recorded event loop delays. | |
| nodejs_eventloop_lag_p99_seconds | The 99th percentile of the recorded event loop delays. | | | nodejs_eventloop_lag_p99_seconds | The 99th percentile of the recorded event loop delays. | |
| nodejs_active_resources | Number of active resources that are currently keeping the event loop alive, grouped by async resource type. | type |
| nodejs_active_resources_total | Total number of active resources. | |
| nodejs_active_handles | Number of active libuv handles grouped by handle type. Every handle type is C++ class name. | type | | nodejs_active_handles | Number of active libuv handles grouped by handle type. Every handle type is C++ class name. | type |
| nodejs_active_handles_total | Total number of active handles. | | | nodejs_active_handles_total | Total number of active handles. | |
| nodejs_active_requests | Number of active libuv requests grouped by request type. Every request type is C++ class name. | type | | nodejs_active_requests | Number of active libuv requests grouped by request type. Every request type is C++ class name. | type |

View File

@ -19,6 +19,9 @@ Each feed will only be checked once, regardless of the number of rooms to which
No entries will be bridged upon the “initial sync” -- all entries that exist at the moment of setup will be considered to be already seen. No entries will be bridged upon the “initial sync” -- all entries that exist at the moment of setup will be considered to be already seen.
Please note that Hookshot **must** be configued with Redis to retain seen entries between restarts. By default, Hookshot will
run an "initial sync" on each startup and will not process any entries from feeds from before the first sync.
## Usage ## Usage
### Adding new feeds ### Adding new feeds

View File

@ -64,7 +64,7 @@
"node-emoji": "^1.11.0", "node-emoji": "^1.11.0",
"nyc": "^15.1.0", "nyc": "^15.1.0",
"p-queue": "^6.6.2", "p-queue": "^6.6.2",
"prom-client": "^14.0.1", "prom-client": "^14.2.0",
"reflect-metadata": "^0.1.13", "reflect-metadata": "^0.1.13",
"source-map-support": "^0.5.21", "source-map-support": "^0.5.21",
"string-argv": "^0.3.1", "string-argv": "^0.3.1",
@ -76,7 +76,7 @@
}, },
"devDependencies": { "devDependencies": {
"@codemirror/lang-javascript": "^6.0.2", "@codemirror/lang-javascript": "^6.0.2",
"@napi-rs/cli": "^2.2.0", "@napi-rs/cli": "^2.13.2",
"@preact/preset-vite": "^2.2.0", "@preact/preset-vite": "^2.2.0",
"@tsconfig/node18": "^2.0.0", "@tsconfig/node18": "^2.0.0",
"@types/ajv": "^1.0.0", "@types/ajv": "^1.0.0",
@ -105,7 +105,7 @@
"rimraf": "^3.0.2", "rimraf": "^3.0.2",
"sass": "^1.51.0", "sass": "^1.51.0",
"ts-node": "^10.9.1", "ts-node": "^10.9.1",
"typescript": "^5.0.4", "typescript": "^5.1.3",
"vite": "^4.1.5", "vite": "^4.1.5",
"vite-svg-loader": "^4.0.0" "vite-svg-loader": "^4.0.0"
} }

View File

@ -776,8 +776,7 @@ export class Bridge {
this.config.feeds, this.config.feeds,
this.connectionManager, this.connectionManager,
this.queue, this.queue,
// Use default bot when storing account data this.storage,
this.as.botClient,
); );
} }

View File

@ -1,13 +1,14 @@
import {Intent, StateEvent} from "matrix-bot-sdk"; import {Intent, StateEvent} from "matrix-bot-sdk";
import { IConnection, IConnectionState, InstantiateConnectionOpts } from "."; import { IConnection, IConnectionState, InstantiateConnectionOpts } from ".";
import { ApiError, ErrCode } from "../api"; import { ApiError, ErrCode } from "../api";
import { FeedEntry, FeedError, FeedReader} from "../feeds/FeedReader"; import { FeedEntry, FeedError} from "../feeds/FeedReader";
import { Logger } from "matrix-appservice-bridge"; import { Logger } from "matrix-appservice-bridge";
import { BaseConnection } from "./BaseConnection"; import { BaseConnection } from "./BaseConnection";
import markdown from "markdown-it"; import markdown from "markdown-it";
import { Connection, ProvisionConnectionOpts } from "./IConnection"; import { Connection, ProvisionConnectionOpts } from "./IConnection";
import { GetConnectionsResponseItem } from "../provisioning/api"; import { GetConnectionsResponseItem } from "../provisioning/api";
import { sanitizeHtml } from "../libRs"; import { readFeed, sanitizeHtml } from "../libRs";
import UserAgent from "../UserAgent";
const log = new Logger("FeedConnection"); const log = new Logger("FeedConnection");
const md = new markdown({ const md = new markdown({
html: true, html: true,
@ -38,7 +39,7 @@ export interface FeedConnectionSecrets {
export type FeedResponseItem = GetConnectionsResponseItem<FeedConnectionState, FeedConnectionSecrets>; export type FeedResponseItem = GetConnectionsResponseItem<FeedConnectionState, FeedConnectionSecrets>;
const MAX_LAST_RESULT_ITEMS = 5; const MAX_LAST_RESULT_ITEMS = 5;
const VALIDATION_FETCH_TIMEOUT_MS = 5000; const VALIDATION_FETCH_TIMEOUT_S = 5;
const MAX_SUMMARY_LENGTH = 512; const MAX_SUMMARY_LENGTH = 512;
const MAX_TEMPLATE_LENGTH = 1024; const MAX_TEMPLATE_LENGTH = 1024;
@ -68,7 +69,10 @@ export class FeedConnection extends BaseConnection implements IConnection {
} }
try { try {
await FeedReader.fetchFeed(url, {}, VALIDATION_FETCH_TIMEOUT_MS); await readFeed(url, {
userAgent: UserAgent,
pollTimeoutSeconds: VALIDATION_FETCH_TIMEOUT_S,
});
} catch (ex) { } catch (ex) {
throw new ApiError(`Could not read feed from URL: ${ex.message}`, ErrCode.BadValue); throw new ApiError(`Could not read feed from URL: ${ex.message}`, ErrCode.BadValue);
} }

View File

@ -7,33 +7,58 @@ const log = new Logger("Metrics");
export class Metrics { export class Metrics {
public readonly expressRouter = Router(); public readonly expressRouter = Router();
public readonly webhooksHttpRequest = new Counter({ name: "hookshot_webhooks_http_request", help: "Number of requests made to the hookshot webhooks handler", labelNames: ["path", "method"], registers: [this.registry]}); public readonly webhooksHttpRequest;
public readonly provisioningHttpRequest = new Counter({ name: "hookshot_provisioning_http_request", help: "Number of requests made to the hookshot webhooks handler", labelNames: ["path", "method"], registers: [this.registry]}); public readonly provisioningHttpRequest;
public readonly messageQueuePushes = new Counter({ name: "hookshot_queue_event_pushes", help: "Number of events pushed through the queue", labelNames: ["event"], registers: [this.registry]}); public readonly messageQueuePushes;
public readonly connectionsEventFailed = new Counter({ name: "hookshot_connection_event_failed", help: "The number of events that failed to process", labelNames: ["event", "connectionId"], registers: [this.registry]}); public readonly connectionsEventFailed;
public readonly connections = new Gauge({ name: "hookshot_connections", help: "The number of active hookshot connections", labelNames: ["service"], registers: [this.registry]}); public readonly connections;
public readonly notificationsPush = new Counter({ name: "hookshot_notifications_push", help: "Number of notifications pushed", labelNames: ["service"], registers: [this.registry]}); public readonly notificationsPush;
public readonly notificationsServiceUp = new Gauge({ name: "hookshot_notifications_service_up", help: "Is the notification service up or down", labelNames: ["service"], registers: [this.registry]}); public readonly notificationsServiceUp;
public readonly notificationsWatchers = new Gauge({ name: "hookshot_notifications_watchers", help: "Number of notifications watchers running", labelNames: ["service"], registers: [this.registry]}); public readonly notificationsWatchers;
private readonly matrixApiCalls = new Counter({ name: "matrix_api_calls", help: "The number of Matrix client API calls made", labelNames: ["method"], registers: [this.registry]}); private readonly matrixApiCalls;
private readonly matrixApiCallsFailed = new Counter({ name: "matrix_api_calls_failed", help: "The number of Matrix client API calls which failed", labelNames: ["method"], registers: [this.registry]}); private readonly matrixApiCallsFailed;
public readonly matrixAppserviceEvents = new Counter({ name: "matrix_appservice_events", help: "The number of events sent over the AS API", labelNames: [], registers: [this.registry]}); public readonly matrixAppserviceEvents;
public readonly matrixAppserviceDecryptionFailed = new Counter({ name: "matrix_appservice_decryption_failed", help: "The number of events sent over the AS API that failed to decrypt", registers: [this.registry]}); public readonly matrixAppserviceDecryptionFailed;
public readonly feedsCount = new Gauge({ name: "hookshot_feeds_count", help: "The number of RSS feeds that hookshot is subscribed to", labelNames: [], registers: [this.registry]}); public readonly feedsCount;
public readonly feedFetchMs = new Gauge({ name: "hookshot_feeds_fetch_ms", help: "The time taken for hookshot to fetch all feeds", labelNames: [], registers: [this.registry]}); public readonly feedFetchMs;
public readonly feedsFailing = new Gauge({ name: "hookshot_feeds_failing", help: "The number of RSS feeds that hookshot is failing to read", labelNames: ["reason"], registers: [this.registry]}); public readonly feedsFailing;
public readonly feedsCountDeprecated = new Gauge({ name: "feed_count", help: "(Deprecated) The number of RSS feeds that hookshot is subscribed to", labelNames: [], registers: [this.registry]}); public readonly feedsCountDeprecated;
public readonly feedsFetchMsDeprecated = new Gauge({ name: "feed_fetch_ms", help: "(Deprecated) The time taken for hookshot to fetch all feeds", labelNames: [], registers: [this.registry]}); public readonly feedsFetchMsDeprecated;
public readonly feedsFailingDeprecated = new Gauge({ name: "feed_failing", help: "(Deprecated) The number of RSS feeds that hookshot is failing to read", labelNames: ["reason"], registers: [this.registry]}); public readonly feedsFailingDeprecated;
constructor(private registry: Registry = register) { constructor(private registry: Registry = register) {
this.expressRouter.get('/metrics', this.metricsFunc.bind(this)); this.expressRouter.get('/metrics', this.metricsFunc.bind(this));
this.webhooksHttpRequest = new Counter({ name: "hookshot_webhooks_http_request", help: "Number of requests made to the hookshot webhooks handler", labelNames: ["path", "method"], registers: [this.registry]});
this.provisioningHttpRequest = new Counter({ name: "hookshot_provisioning_http_request", help: "Number of requests made to the hookshot webhooks handler", labelNames: ["path", "method"], registers: [this.registry]});
this.messageQueuePushes = new Counter({ name: "hookshot_queue_event_pushes", help: "Number of events pushed through the queue", labelNames: ["event"], registers: [this.registry]});
this.connectionsEventFailed = new Counter({ name: "hookshot_connection_event_failed", help: "The number of events that failed to process", labelNames: ["event", "connectionId"], registers: [this.registry]});
this.connections = new Gauge({ name: "hookshot_connections", help: "The number of active hookshot connections", labelNames: ["service"], registers: [this.registry]});
this.notificationsPush = new Counter({ name: "hookshot_notifications_push", help: "Number of notifications pushed", labelNames: ["service"], registers: [this.registry]});
this.notificationsServiceUp = new Gauge({ name: "hookshot_notifications_service_up", help: "Is the notification service up or down", labelNames: ["service"], registers: [this.registry]});
this.notificationsWatchers = new Gauge({ name: "hookshot_notifications_watchers", help: "Number of notifications watchers running", labelNames: ["service"], registers: [this.registry]});
this.matrixApiCalls = new Counter({ name: "matrix_api_calls", help: "The number of Matrix client API calls made", labelNames: ["method"], registers: [this.registry]});
this.matrixApiCallsFailed = new Counter({ name: "matrix_api_calls_failed", help: "The number of Matrix client API calls which failed", labelNames: ["method"], registers: [this.registry]});
this.matrixAppserviceEvents = new Counter({ name: "matrix_appservice_events", help: "The number of events sent over the AS API", labelNames: [], registers: [this.registry]});
this.matrixAppserviceDecryptionFailed = new Counter({ name: "matrix_appservice_decryption_failed", help: "The number of events sent over the AS API that failed to decrypt", registers: [this.registry]});
this.feedsCount = new Gauge({ name: "hookshot_feeds_count", help: "The number of RSS feeds that hookshot is subscribed to", labelNames: [], registers: [this.registry]});
this.feedFetchMs = new Gauge({ name: "hookshot_feeds_fetch_ms", help: "The time taken for hookshot to fetch all feeds", labelNames: [], registers: [this.registry]});
this.feedsFailing = new Gauge({ name: "hookshot_feeds_failing", help: "The number of RSS feeds that hookshot is failing to read", labelNames: ["reason"], registers: [this.registry]});
this.feedsCountDeprecated = new Gauge({ name: "feed_count", help: "(Deprecated) The number of RSS feeds that hookshot is subscribed to", labelNames: [], registers: [this.registry]});
this.feedsFetchMsDeprecated = new Gauge({ name: "feed_fetch_ms", help: "(Deprecated) The time taken for hookshot to fetch all feeds", labelNames: [], registers: [this.registry]});
this.feedsFailingDeprecated = new Gauge({ name: "feed_failing", help: "(Deprecated) The number of RSS feeds that hookshot is failing to read", labelNames: ["reason"], registers: [this.registry]});
collectDefaultMetrics({ collectDefaultMetrics({
register: this.registry, register: this.registry,
}) })

View File

@ -1,5 +1,5 @@
import { MemoryStorageProvider as MSP } from "matrix-bot-sdk"; import { MemoryStorageProvider as MSP } from "matrix-bot-sdk";
import { IBridgeStorageProvider } from "./StorageProvider"; import { IBridgeStorageProvider, MAX_FEED_ITEMS } from "./StorageProvider";
import { IssuesGetResponseData } from "../github/Types"; import { IssuesGetResponseData } from "../github/Types";
import { ProvisionSession } from "matrix-appservice-bridge"; import { ProvisionSession } from "matrix-appservice-bridge";
import QuickLRU from "@alloc/quick-lru"; import QuickLRU from "@alloc/quick-lru";
@ -11,10 +11,32 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider
private figmaCommentIds: Map<string, string> = new Map(); private figmaCommentIds: Map<string, string> = new Map();
private widgetSessions: Map<string, ProvisionSession> = new Map(); private widgetSessions: Map<string, ProvisionSession> = new Map();
private storedFiles = new QuickLRU<string, string>({ maxSize: 128 }); private storedFiles = new QuickLRU<string, string>({ maxSize: 128 });
private feedGuids = new Map<string, Array<string>>();
constructor() { constructor() {
super(); super();
} }
async storeFeedGuids(url: string, ...guids: string[]): Promise<void> {
let set = this.feedGuids.get(url);
if (!set) {
set = []
this.feedGuids.set(url, set);
}
set.unshift(...guids);
while (set.length > MAX_FEED_ITEMS) {
set.pop();
}
}
async hasSeenFeed(url: string): Promise<boolean> {
return this.feedGuids.has(url);
}
async hasSeenFeedGuid(url: string, guid: string): Promise<boolean> {
return this.feedGuids.get(url)?.includes(guid) ?? false;
}
public async setGithubIssue(repo: string, issueNumber: string, data: IssuesGetResponseData, scope = "") { public async setGithubIssue(repo: string, issueNumber: string, data: IssuesGetResponseData, scope = "") {
this.issues.set(`${scope}${repo}/${issueNumber}`, data); this.issues.set(`${scope}${repo}/${issueNumber}`, data);
} }

View File

@ -2,7 +2,7 @@ import { IssuesGetResponseData } from "../github/Types";
import { Redis, default as redis } from "ioredis"; import { Redis, default as redis } from "ioredis";
import { Logger } from "matrix-appservice-bridge"; import { Logger } from "matrix-appservice-bridge";
import { IBridgeStorageProvider } from "./StorageProvider"; import { IBridgeStorageProvider, MAX_FEED_ITEMS } from "./StorageProvider";
import { IFilterInfo, IStorageProvider } from "matrix-bot-sdk"; import { IFilterInfo, IStorageProvider } from "matrix-bot-sdk";
import { ProvisionSession } from "matrix-appservice-bridge"; import { ProvisionSession } from "matrix-appservice-bridge";
@ -25,6 +25,10 @@ const ISSUES_LAST_COMMENT_EXPIRE_AFTER = 14 * 24 * 60 * 60; // 7 days
const WIDGET_TOKENS = "widgets.tokens."; const WIDGET_TOKENS = "widgets.tokens.";
const WIDGET_USER_TOKENS = "widgets.user-tokens."; const WIDGET_USER_TOKENS = "widgets.user-tokens.";
const FEED_GUIDS = "feeds.guids.";
const log = new Logger("RedisASProvider"); const log = new Logger("RedisASProvider");
export class RedisStorageContextualProvider implements IStorageProvider { export class RedisStorageContextualProvider implements IStorageProvider {
@ -61,6 +65,7 @@ export class RedisStorageContextualProvider implements IStorageProvider {
} }
export class RedisStorageProvider extends RedisStorageContextualProvider implements IBridgeStorageProvider { export class RedisStorageProvider extends RedisStorageContextualProvider implements IBridgeStorageProvider {
constructor(host: string, port: number, contextSuffix = '') { constructor(host: string, port: number, contextSuffix = '') {
super(new redis(port, host), contextSuffix); super(new redis(port, host), contextSuffix);
@ -198,4 +203,18 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme
public async setStoredTempFile(key: string, value: string) { public async setStoredTempFile(key: string, value: string) {
await this.redis.set(STORED_FILES_KEY + key, value); await this.redis.set(STORED_FILES_KEY + key, value);
} }
public async storeFeedGuids(url: string, ...guid: string[]): Promise<void> {
const feedKey = `${FEED_GUIDS}${url}`;
await this.redis.lpush(feedKey, ...guid);
await this.redis.ltrim(feedKey, 0, MAX_FEED_ITEMS);
}
public async hasSeenFeed(url: string): Promise<boolean> {
return (await this.redis.exists(`${FEED_GUIDS}${url}`)) === 1;
}
public async hasSeenFeedGuid(url: string, guid: string): Promise<boolean> {
return (await this.redis.lpos(`${FEED_GUIDS}${url}`, guid)) != null;
}
} }

View File

@ -2,6 +2,13 @@ import { ProvisioningStore } from "matrix-appservice-bridge";
import { IAppserviceStorageProvider, IStorageProvider } from "matrix-bot-sdk"; import { IAppserviceStorageProvider, IStorageProvider } from "matrix-bot-sdk";
import { IssuesGetResponseData } from "../github/Types"; import { IssuesGetResponseData } from "../github/Types";
// Some RSS feeds can return a very small number of items then bounce
// back to their "normal" size, so we cannot just clobber the recent GUID list per request or else we'll
// forget what we sent and resend it. Instead, we'll keep 2x the max number of items that we've ever
// seen from this feed, up to a max of 10,000.
// Adopted from https://github.com/matrix-org/go-neb/blob/babb74fa729882d7265ff507b09080e732d060ae/services/rssbot/rssbot.go#L304
export const MAX_FEED_ITEMS = 10_000;
export interface IBridgeStorageProvider extends IAppserviceStorageProvider, IStorageProvider, ProvisioningStore { export interface IBridgeStorageProvider extends IAppserviceStorageProvider, IStorageProvider, ProvisioningStore {
connect?(): Promise<void>; connect?(): Promise<void>;
disconnect?(): Promise<void>; disconnect?(): Promise<void>;
@ -15,4 +22,7 @@ export interface IBridgeStorageProvider extends IAppserviceStorageProvider, ISto
getFigmaCommentEventId(roomId: string, figmaCommentId: string): Promise<string|null>; getFigmaCommentEventId(roomId: string, figmaCommentId: string): Promise<string|null>;
getStoredTempFile(key: string): Promise<string|null>; getStoredTempFile(key: string): Promise<string|null>;
setStoredTempFile(key: string, value: string): Promise<void>; setStoredTempFile(key: string, value: string): Promise<void>;
storeFeedGuids(url: string, ...guid: string[]): Promise<void>;
hasSeenFeed(url: string, ...guid: string[]): Promise<boolean>;
hasSeenFeedGuid(url: string, guid: string): Promise<boolean>;
} }

View File

@ -1,21 +1,16 @@
import { MatrixError } from "matrix-bot-sdk";
import { BridgeConfigFeeds } from "../config/Config"; import { BridgeConfigFeeds } from "../config/Config";
import { ConnectionManager } from "../ConnectionManager"; import { ConnectionManager } from "../ConnectionManager";
import { FeedConnection } from "../Connections"; import { FeedConnection } from "../Connections";
import { Logger } from "matrix-appservice-bridge"; import { Logger } from "matrix-appservice-bridge";
import { MessageQueue } from "../MessageQueue"; import { MessageQueue } from "../MessageQueue";
import axios from "axios";
import Ajv from "ajv";
import axios, { AxiosResponse } from "axios";
import Metrics from "../Metrics"; import Metrics from "../Metrics";
import UserAgent from "../UserAgent";
import { randomUUID } from "crypto"; import { randomUUID } from "crypto";
import { StatusCodes } from "http-status-codes"; import { readFeed } from "../libRs";
import { FormatUtil } from "../FormatUtil"; import { IBridgeStorageProvider } from "../Stores/StorageProvider";
import { JsRssChannel, parseFeed } from "../libRs"; import UserAgent from "../UserAgent";
const log = new Logger("FeedReader"); const log = new Logger("FeedReader");
export class FeedError extends Error { export class FeedError extends Error {
constructor( constructor(
public url: string, public url: string,
@ -64,28 +59,6 @@ export interface FeedSuccess {
url: string, url: string,
} }
interface AccountData {
[url: string]: string[],
}
interface AccountDataStore {
getAccountData<T>(type: string): Promise<T>;
setAccountData<T>(type: string, data: T): Promise<void>;
}
const accountDataSchema = {
type: 'object',
patternProperties: {
"https?://.+": {
type: 'array',
items: { type: 'string' },
}
},
additionalProperties: false,
};
const ajv = new Ajv();
const validateAccountData = ajv.compile<AccountData>(accountDataSchema);
function isNonEmptyString(input: unknown): input is string { function isNonEmptyString(input: unknown): input is string {
return Boolean(input) && typeof input === 'string'; return Boolean(input) && typeof input === 'string';
} }
@ -109,35 +82,6 @@ function shuffle<T>(array: T[]): T[] {
} }
export class FeedReader { export class FeedReader {
/**
* Read a feed URL and parse it into a set of items.
* @param url The feed URL.
* @param headers Any headers to provide.
* @param timeoutMs How long to wait for the response, in milliseconds.
* @param parser The parser instance. If not provided, this creates a new parser.
* @returns The raw axios response, and the parsed feed.
*/
public static async fetchFeed(
url: string,
headers: Record<string, string>,
timeoutMs: number,
httpClient = axios,
): Promise<{ response: AxiosResponse, feed: JsRssChannel }> {
const response = await httpClient.get(url, {
headers: {
'User-Agent': UserAgent,
...headers,
},
// We don't want to wait forever for the feed.
timeout: timeoutMs,
});
if (typeof response.data !== "string") {
throw Error('Unexpected response type');
}
const feed = parseFeed(response.data);
return { response, feed };
}
private connections: FeedConnection[]; private connections: FeedConnection[];
// ts should notice that we do in fact initialize it in constructor, but it doesn't (in this version) // ts should notice that we do in fact initialize it in constructor, but it doesn't (in this version)
@ -145,7 +89,6 @@ export class FeedReader {
private feedQueue: string[] = []; private feedQueue: string[] = [];
private seenEntries: Map<string, string[]> = new Map();
// A set of last modified times for each url. // A set of last modified times for each url.
private cacheTimes: Map<string, { etag?: string, lastModified?: string}> = new Map(); private cacheTimes: Map<string, { etag?: string, lastModified?: string}> = new Map();
@ -156,19 +99,26 @@ export class FeedReader {
static readonly seenEntriesEventType = "uk.half-shot.matrix-hookshot.feed.reader.seenEntries"; static readonly seenEntriesEventType = "uk.half-shot.matrix-hookshot.feed.reader.seenEntries";
private shouldRun = true; private shouldRun = true;
private timeout?: NodeJS.Timeout; private readonly timeouts: (NodeJS.Timeout|undefined)[];
get sleepingInterval() { get sleepingInterval() {
return (this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length || 1); return (
// Calculate the number of MS to wait in between feeds.
(this.config.pollIntervalSeconds * 1000) / (this.feedQueue.length || 1)
// And multiply by the number of concurrent readers
) * this.config.pollConcurrency;
} }
constructor( constructor(
private readonly config: BridgeConfigFeeds, private readonly config: BridgeConfigFeeds,
private readonly connectionManager: ConnectionManager, private readonly connectionManager: ConnectionManager,
private readonly queue: MessageQueue, private readonly queue: MessageQueue,
private readonly accountDataStore: AccountDataStore, private readonly storage: IBridgeStorageProvider,
private readonly httpClient = axios,
) { ) {
// Ensure a fixed length array,
this.timeouts = new Array(config.pollConcurrency);
this.timeouts.fill(undefined);
Object.seal(this.timeouts);
this.connections = this.connectionManager.getAllConnectionsOfType(FeedConnection); this.connections = this.connectionManager.getAllConnectionsOfType(FeedConnection);
this.calculateFeedUrls(); this.calculateFeedUrls();
connectionManager.on('new-connection', c => { connectionManager.on('new-connection', c => {
@ -187,16 +137,14 @@ export class FeedReader {
log.debug('Loaded feed URLs:', this.observedFeedUrls); log.debug('Loaded feed URLs:', this.observedFeedUrls);
void this.loadSeenEntries().then(() => { for (let i = 0; i < config.pollConcurrency; i++) {
for (let i = 0; i < config.pollConcurrency; i++) { void this.pollFeeds(i);
void this.pollFeeds(i); }
}
});
} }
public stop() { public stop() {
clearTimeout(this.timeout);
this.shouldRun = false; this.shouldRun = false;
this.timeouts.forEach(t => clearTimeout(t));
} }
private calculateFeedUrls(): void { private calculateFeedUrls(): void {
@ -216,36 +164,6 @@ export class FeedReader {
Metrics.feedsCountDeprecated.set(this.observedFeedUrls.size); Metrics.feedsCountDeprecated.set(this.observedFeedUrls.size);
} }
private async loadSeenEntries(): Promise<void> {
try {
const accountData = await this.accountDataStore.getAccountData<AccountData>(FeedReader.seenEntriesEventType).catch((err: MatrixError|unknown) => {
if (err instanceof MatrixError && err.statusCode === 404) {
return {} as AccountData;
} else {
throw err;
}
});
if (!validateAccountData(accountData)) {
const errors = validateAccountData.errors?.map(e => `${e.instancePath} ${e.message}`) || ['No error reported'];
throw new Error(`Invalid account data: ${errors.join(', ')}`);
}
for (const url in accountData) {
this.seenEntries.set(url, accountData[url]);
}
} catch (err: unknown) {
log.error(`Failed to load seen feed entries from accountData: ${err}. This may result in skipped entries`);
// no need to wipe it manually, next saveSeenEntries() will make it right
}
}
private async saveSeenEntries(): Promise<void> {
const accountData: AccountData = {};
for (const [url, guids] of this.seenEntries.entries()) {
accountData[url.toString()] = guids;
}
await this.accountDataStore.setAccountData(FeedReader.seenEntriesEventType, accountData);
}
/** /**
* Poll a given feed URL for data, pushing any entries found into the message queue. * Poll a given feed URL for data, pushing any entries found into the message queue.
* We also check the `cacheTimes` cache to see if the feed has recent entries that we can * We also check the `cacheTimes` cache to see if the feed has recent entries that we can
@ -260,95 +178,80 @@ export class FeedReader {
const { etag, lastModified } = this.cacheTimes.get(url) || {}; const { etag, lastModified } = this.cacheTimes.get(url) || {};
log.debug(`Checking for updates in ${url} (${etag ?? lastModified})`); log.debug(`Checking for updates in ${url} (${etag ?? lastModified})`);
try { try {
const { response, feed } = await FeedReader.fetchFeed( const result = await readFeed(url, {
url, pollTimeoutSeconds: this.config.pollTimeoutSeconds,
{ etag,
...(lastModified && { 'If-Modified-Since': lastModified}), lastModified,
...(etag && { 'If-None-Match': etag}), userAgent: UserAgent,
}, });
// We don't want to wait forever for the feed.
this.config.pollTimeoutSeconds * 1000,
this.httpClient,
);
// Store any entity tags/cache times. // Store any entity tags/cache times.
if (response.headers.ETag) { if (result.etag) {
this.cacheTimes.set(url, { etag: response.headers.ETag}); this.cacheTimes.set(url, { etag: result.etag });
} else if (response.headers['Last-Modified']) { } else if (result.lastModified) {
this.cacheTimes.set(url, { lastModified: response.headers['Last-Modified'] }); this.cacheTimes.set(url, { lastModified: result.lastModified });
} }
const { feed } = result;
let initialSync = false; let initialSync = false;
let seenGuids = this.seenEntries.get(url); if (!await this.storage.hasSeenFeed(url)) {
if (!seenGuids) {
initialSync = true; initialSync = true;
seenGuids = [];
seenEntriesChanged = true; // to ensure we only treat it as an initialSync once seenEntriesChanged = true; // to ensure we only treat it as an initialSync once
} }
// migrate legacy, cleartext guids to their md5-hashed counterparts
seenGuids = seenGuids.map(guid => guid.startsWith('md5:') ? guid : this.hashGuid(guid));
const seenGuidsSet = new Set(seenGuids);
const newGuids = []; const newGuids = [];
log.debug(`Found ${feed.items.length} entries in ${url}`); if (feed) {
// If undefined, we got a not-modified.
log.debug(`Found ${feed.items.length} entries in ${url}`);
for (const item of feed.items) { for (const item of feed.items) {
// Find the first guid-like that looks like a string. // Some feeds have a nasty habit of leading a empty tag there, making us parse it as garbage.
// Some feeds have a nasty habit of leading a empty tag there, making us parse it as garbage. if (!item.hashId) {
if (!item.hashId) { log.error(`Could not determine guid for entry in ${url}, skipping`);
log.error(`Could not determine guid for entry in ${url}, skipping`); continue;
continue; }
const hashId = `md5:${item.hashId}`;
newGuids.push(hashId);
if (initialSync) {
log.debug(`Skipping entry ${item.id ?? hashId} since we're performing an initial sync`);
continue;
}
if (await this.storage.hasSeenFeedGuid(url, hashId)) {
log.debug('Skipping already seen entry', item.id ?? hashId);
continue;
}
const entry = {
feed: {
title: isNonEmptyString(feed.title) ? stripHtml(feed.title) : null,
url: url,
},
title: isNonEmptyString(item.title) ? stripHtml(item.title) : null,
pubdate: item.pubdate ?? null,
summary: item.summary ?? null,
author: item.author ?? null,
link: item.link ?? null,
fetchKey
};
log.debug('New entry:', entry);
seenEntriesChanged = true;
this.queue.push<FeedEntry>({ eventName: 'feed.entry', sender: 'FeedReader', data: entry });
} }
const hashId = `md5:${item.hashId}`;
newGuids.push(hashId); if (seenEntriesChanged) {
await this.storage.storeFeedGuids(url, ...newGuids);
if (initialSync) {
log.debug(`Skipping entry ${item.id ?? hashId} since we're performing an initial sync`);
continue;
} }
if (seenGuidsSet.has(hashId)) {
log.debug('Skipping already seen entry', item.id ?? hashId);
continue;
}
const entry = {
feed: {
title: isNonEmptyString(feed.title) ? stripHtml(feed.title) : null,
url: url,
},
title: isNonEmptyString(item.title) ? stripHtml(item.title) : null,
pubdate: item.pubdate ?? null,
summary: item.summary ?? null,
author: item.author ?? null,
link: item.link ?? null,
fetchKey
};
log.debug('New entry:', entry);
seenEntriesChanged = true;
this.queue.push<FeedEntry>({ eventName: 'feed.entry', sender: 'FeedReader', data: entry });
} }
this.queue.push<FeedSuccess>({ eventName: 'feed.success', sender: 'FeedReader', data: { url } });
if (seenEntriesChanged) {
// Some RSS feeds can return a very small number of items then bounce
// back to their "normal" size, so we cannot just clobber the recent GUID list per request or else we'll
// forget what we sent and resend it. Instead, we'll keep 2x the max number of items that we've ever
// seen from this feed, up to a max of 10,000.
// Adopted from https://github.com/matrix-org/go-neb/blob/babb74fa729882d7265ff507b09080e732d060ae/services/rssbot/rssbot.go#L304
const maxGuids = Math.min(Math.max(2 * newGuids.length, seenGuids.length), 10_000);
const newSeenItems = Array.from(new Set([ ...newGuids, ...seenGuids ]).values()).slice(0, maxGuids);
this.seenEntries.set(url, newSeenItems);
}
this.queue.push<FeedSuccess>({ eventName: 'feed.success', sender: 'FeedReader', data: { url: url } });
// Clear any feed failures // Clear any feed failures
this.feedsFailingHttp.delete(url); this.feedsFailingHttp.delete(url);
this.feedsFailingParsing.delete(url); this.feedsFailingParsing.delete(url);
} catch (err: unknown) { } catch (err: unknown) {
if (axios.isAxiosError(err)) { // TODO: Proper Rust Type error.
// No new feed items, skip. if ((err as Error).message.includes('Failed to fetch feed due to HTTP')) {
if (err.response?.status === StatusCodes.NOT_MODIFIED) {
return false;
}
this.feedsFailingHttp.add(url); this.feedsFailingHttp.add(url);
} else { } else {
this.feedsFailingParsing.add(url); this.feedsFailingParsing.add(url);
@ -383,7 +286,7 @@ export class FeedReader {
if (url) { if (url) {
if (await this.pollFeed(url)) { if (await this.pollFeed(url)) {
await this.saveSeenEntries(); log.debug(`Feed changed and will be saved`);
} }
const elapsed = Date.now() - fetchingStarted; const elapsed = Date.now() - fetchingStarted;
Metrics.feedFetchMs.set(elapsed); Metrics.feedFetchMs.set(elapsed);
@ -399,15 +302,11 @@ export class FeedReader {
log.debug(`No feeds available to poll for worker ${workerId}`); log.debug(`No feeds available to poll for worker ${workerId}`);
} }
this.timeout = setTimeout(() => { this.timeouts[workerId] = setTimeout(() => {
if (!this.shouldRun) { if (!this.shouldRun) {
return; return;
} }
void this.pollFeeds(workerId); void this.pollFeeds(workerId);
}, sleepFor); }, sleepFor);
} }
private hashGuid(guid: string): string {
return `md5:${FormatUtil.hashId(guid)}`;
}
} }

View File

@ -1,7 +1,11 @@
use std::str::FromStr; use std::{str::FromStr, time::Duration};
use atom_syndication::{Error as AtomError, Feed, Person}; use atom_syndication::{Error as AtomError, Feed, Person};
use napi::bindgen_prelude::{Error as JsError, Status}; use napi::bindgen_prelude::{Error as JsError, Status};
use reqwest::{
header::{HeaderMap, HeaderValue},
Method, StatusCode,
};
use rss::{Channel, Error as RssError}; use rss::{Channel, Error as RssError};
use crate::format_util::hash_id; use crate::format_util::hash_id;
@ -26,6 +30,23 @@ pub struct JsRssChannel {
pub items: Vec<FeedItem>, pub items: Vec<FeedItem>,
} }
#[derive(Serialize, Debug, Deserialize)]
#[napi(object)]
pub struct ReadFeedOptions {
pub last_modified: Option<String>,
pub etag: Option<String>,
pub poll_timeout_seconds: i64,
pub user_agent: String,
}
#[derive(Serialize, Debug, Deserialize)]
#[napi(object)]
pub struct FeedResult {
pub feed: Option<JsRssChannel>,
pub etag: Option<String>,
pub last_modified: Option<String>,
}
fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel { fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel {
JsRssChannel { JsRssChannel {
title: channel.title().to_string(), title: channel.title().to_string(),
@ -145,3 +166,65 @@ pub fn js_parse_feed(xml: String) -> Result<JsRssChannel, JsError> {
Err(RssError::Eof) => Err(JsError::new(Status::Unknown, "Unexpected end of input")), Err(RssError::Eof) => Err(JsError::new(Status::Unknown, "Unexpected end of input")),
} }
} }
#[napi(js_name = "readFeed")]
pub async fn js_read_feed(url: String, options: ReadFeedOptions) -> Result<FeedResult, JsError> {
let client = reqwest::Client::new();
let req = client
.request(Method::GET, url)
.timeout(Duration::from_secs(
options.poll_timeout_seconds.try_into().unwrap(),
));
let mut headers: HeaderMap = HeaderMap::new();
headers.append(
"User-Agent",
HeaderValue::from_str(&options.user_agent).unwrap(),
);
if let Some(last_modifed) = options.last_modified {
headers.append(
"If-Modified-Since",
HeaderValue::from_str(&last_modifed).unwrap(),
);
}
if let Some(etag) = options.etag {
headers.append("If-None-Match", HeaderValue::from_str(&etag).unwrap());
}
match req.headers(headers).send().await {
Ok(res) => {
let res_headers = res.headers().clone();
match res.status() {
StatusCode::OK => match res.text().await {
Ok(body) => match js_parse_feed(body) {
Ok(feed) => Ok(FeedResult {
feed: Some(feed),
etag: res_headers
.get("ETag")
.map(|v| v.to_str().unwrap())
.map(|v| v.to_string()),
last_modified: res_headers
.get("Last-Modified")
.map(|v| v.to_str().unwrap())
.map(|v| v.to_string()),
}),
Err(err) => Err(err),
},
Err(err) => Err(JsError::new(Status::Unknown, err)),
},
StatusCode::NOT_MODIFIED => Ok(FeedResult {
feed: None,
etag: None,
last_modified: None,
}),
status => Err(JsError::new(
Status::Unknown,
format!("Failed to fetch feed due to HTTP {}", status),
)),
}
}
Err(err) => Err(JsError::new(Status::Unknown, err)),
}
}

View File

@ -1,4 +1,3 @@
import { AxiosResponse, AxiosStatic } from "axios";
import { expect } from "chai"; import { expect } from "chai";
import EventEmitter from "events"; import EventEmitter from "events";
import { BridgeConfigFeeds } from "../src/config/Config"; import { BridgeConfigFeeds } from "../src/config/Config";
@ -6,6 +5,9 @@ import { ConnectionManager } from "../src/ConnectionManager";
import { IConnection } from "../src/Connections"; import { IConnection } from "../src/Connections";
import { FeedEntry, FeedReader } from "../src/feeds/FeedReader"; import { FeedEntry, FeedReader } from "../src/feeds/FeedReader";
import { MessageQueue, MessageQueueMessage } from "../src/MessageQueue"; import { MessageQueue, MessageQueueMessage } from "../src/MessageQueue";
import { MemoryStorageProvider } from "../src/Stores/MemoryStorageProvider";
import { Server, createServer } from 'http';
import { AddressInfo } from "net";
class MockConnectionManager extends EventEmitter { class MockConnectionManager extends EventEmitter {
constructor( constructor(
@ -37,38 +39,41 @@ class MockMessageQueue extends EventEmitter implements MessageQueue {
} }
} }
class MockHttpClient { async function constructFeedReader(feedResponse: () => {headers: Record<string,string>, data: string}) {
constructor(public response: AxiosResponse) {} const httpServer = await new Promise<Server>(resolve => {
const srv = createServer((_req, res) => {
get(): Promise<AxiosResponse> { res.writeHead(200);
return Promise.resolve(this.response); const { headers, data } = feedResponse();
} Object.entries(headers).forEach(([key,value]) => {
} res.setHeader(key, value);
});
const FEED_URL = 'http://test/'; res.write(data);
res.end();
function constructFeedReader(feedResponse: () => {headers: Record<string,string>, data: string}) { }).listen(0, '127.0.0.1', () => {
resolve(srv);
});
});
const address = httpServer.address() as AddressInfo;
const feedUrl = `http://127.0.0.1:${address.port}/`
const config = new BridgeConfigFeeds({ const config = new BridgeConfigFeeds({
enabled: true, enabled: true,
pollIntervalSeconds: 1, pollIntervalSeconds: 1,
pollTimeoutSeconds: 1, pollTimeoutSeconds: 1,
}); });
const cm = new MockConnectionManager([{ feedUrl: FEED_URL } as unknown as IConnection]) as unknown as ConnectionManager const cm = new MockConnectionManager([{ feedUrl } as unknown as IConnection]) as unknown as ConnectionManager
const mq = new MockMessageQueue(); const mq = new MockMessageQueue();
const storage = new MemoryStorageProvider();
// Ensure we don't initial sync by storing a guid.
await storage.storeFeedGuids(feedUrl, '-test-guid-');
const feedReader = new FeedReader( const feedReader = new FeedReader(
config, cm, mq, config, cm, mq, storage,
{
getAccountData: <T>() => Promise.resolve({ [FEED_URL]: [] } as unknown as T),
setAccountData: () => Promise.resolve(),
},
new MockHttpClient({ ...feedResponse() } as AxiosResponse) as unknown as AxiosStatic,
); );
return {config, cm, mq, feedReader}; return {config, cm, mq, feedReader, feedUrl, httpServer};
} }
describe("FeedReader", () => { describe("FeedReader", () => {
it("should correctly handle empty titles", async () => { it("should correctly handle empty titles", async () => {
const { mq, feedReader} = constructFeedReader(() => ({ const { mq, feedReader, httpServer } = await constructFeedReader(() => ({
headers: {}, data: ` headers: {}, data: `
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:atom="http://www.w3.org/2005/Atom" version="2.0"> <rss xmlns:atom="http://www.w3.org/2005/Atom" version="2.0">
@ -84,6 +89,8 @@ describe("FeedReader", () => {
` `
})); }));
after(() => httpServer.close());
const event: any = await new Promise((resolve) => { const event: any = await new Promise((resolve) => {
mq.on('pushed', (data) => { resolve(data); feedReader.stop() }); mq.on('pushed', (data) => { resolve(data); feedReader.stop() });
}); });
@ -93,7 +100,7 @@ describe("FeedReader", () => {
expect(event.data.title).to.equal(null); expect(event.data.title).to.equal(null);
}); });
it("should handle RSS 2.0 feeds", async () => { it("should handle RSS 2.0 feeds", async () => {
const { mq, feedReader} = constructFeedReader(() => ({ const { mq, feedReader, httpServer } = await constructFeedReader(() => ({
headers: {}, data: ` headers: {}, data: `
<?xml version="1.0" encoding="UTF-8" ?> <?xml version="1.0" encoding="UTF-8" ?>
<rss version="2.0"> <rss version="2.0">
@ -118,6 +125,8 @@ describe("FeedReader", () => {
` `
})); }));
after(() => httpServer.close());
const event: MessageQueueMessage<FeedEntry> = await new Promise((resolve) => { const event: MessageQueueMessage<FeedEntry> = await new Promise((resolve) => {
mq.on('pushed', (data) => { resolve(data); feedReader.stop() }); mq.on('pushed', (data) => { resolve(data); feedReader.stop() });
}); });
@ -131,7 +140,7 @@ describe("FeedReader", () => {
expect(event.data.pubdate).to.equal('Sun, 6 Sep 2009 16:20:00 +0000'); expect(event.data.pubdate).to.equal('Sun, 6 Sep 2009 16:20:00 +0000');
}); });
it("should handle RSS feeds with a permalink url", async () => { it("should handle RSS feeds with a permalink url", async () => {
const { mq, feedReader} = constructFeedReader(() => ({ const { mq, feedReader, httpServer } = await constructFeedReader(() => ({
headers: {}, data: ` headers: {}, data: `
<?xml version="1.0" encoding="UTF-8" ?> <?xml version="1.0" encoding="UTF-8" ?>
<rss version="2.0"> <rss version="2.0">
@ -155,6 +164,8 @@ describe("FeedReader", () => {
` `
})); }));
after(() => httpServer.close());
const event: MessageQueueMessage<FeedEntry> = await new Promise((resolve) => { const event: MessageQueueMessage<FeedEntry> = await new Promise((resolve) => {
mq.on('pushed', (data) => { resolve(data); feedReader.stop() }); mq.on('pushed', (data) => { resolve(data); feedReader.stop() });
}); });
@ -168,7 +179,7 @@ describe("FeedReader", () => {
expect(event.data.pubdate).to.equal('Sun, 6 Sep 2009 16:20:00 +0000'); expect(event.data.pubdate).to.equal('Sun, 6 Sep 2009 16:20:00 +0000');
}); });
it("should handle Atom feeds", async () => { it("should handle Atom feeds", async () => {
const { mq, feedReader} = constructFeedReader(() => ({ const { mq, feedReader, httpServer } = await constructFeedReader(() => ({
headers: {}, data: ` headers: {}, data: `
<?xml version="1.0" encoding="utf-8"?> <?xml version="1.0" encoding="utf-8"?>
<feed xmlns="http://www.w3.org/2005/Atom"> <feed xmlns="http://www.w3.org/2005/Atom">
@ -196,6 +207,8 @@ describe("FeedReader", () => {
` `
})); }));
after(() => httpServer.close());
const event: MessageQueueMessage<FeedEntry> = await new Promise((resolve) => { const event: MessageQueueMessage<FeedEntry> = await new Promise((resolve) => {
mq.on('pushed', (data) => { resolve(data); feedReader.stop() }); mq.on('pushed', (data) => { resolve(data); feedReader.stop() });
}); });
@ -209,7 +222,7 @@ describe("FeedReader", () => {
expect(event.data.pubdate).to.equal('Sat, 13 Dec 2003 18:30:02 +0000'); expect(event.data.pubdate).to.equal('Sat, 13 Dec 2003 18:30:02 +0000');
}); });
it("should not duplicate feed entries", async () => { it("should not duplicate feed entries", async () => {
const { mq, feedReader} = constructFeedReader(() => ({ const { mq, feedReader, httpServer, feedUrl } = await constructFeedReader(() => ({
headers: {}, data: ` headers: {}, data: `
<?xml version="1.0" encoding="utf-8"?> <?xml version="1.0" encoding="utf-8"?>
<feed xmlns="http://www.w3.org/2005/Atom"> <feed xmlns="http://www.w3.org/2005/Atom">
@ -227,11 +240,13 @@ describe("FeedReader", () => {
` `
})); }));
after(() => httpServer.close());
const events: MessageQueueMessage<FeedEntry>[] = []; const events: MessageQueueMessage<FeedEntry>[] = [];
mq.on('pushed', (data) => { if (data.eventName === 'feed.entry') {events.push(data);} }); mq.on('pushed', (data) => { if (data.eventName === 'feed.entry') {events.push(data);} });
await feedReader.pollFeed(FEED_URL); await feedReader.pollFeed(feedUrl);
await feedReader.pollFeed(FEED_URL); await feedReader.pollFeed(feedUrl);
await feedReader.pollFeed(FEED_URL); await feedReader.pollFeed(feedUrl);
feedReader.stop(); feedReader.stop();
expect(events).to.have.lengthOf(1); expect(events).to.have.lengthOf(1);
}); });

View File

@ -919,10 +919,10 @@
resolved "https://registry.yarnpkg.com/@mdn/browser-compat-data/-/browser-compat-data-4.1.14.tgz#45b45f2fcd8fe766950e5abc40efde94b4348efd" resolved "https://registry.yarnpkg.com/@mdn/browser-compat-data/-/browser-compat-data-4.1.14.tgz#45b45f2fcd8fe766950e5abc40efde94b4348efd"
integrity sha512-pndsgd4jXIGcgWKPXkN5AL1rdwhgQpLXWyK25jb42SUaeujs/GhRK8+Q4W97RTiCirf/DoaahcTI/3Op6+/gfw== integrity sha512-pndsgd4jXIGcgWKPXkN5AL1rdwhgQpLXWyK25jb42SUaeujs/GhRK8+Q4W97RTiCirf/DoaahcTI/3Op6+/gfw==
"@napi-rs/cli@^2.2.0": "@napi-rs/cli@^2.13.2":
version "2.2.0" version "2.16.1"
resolved "https://registry.yarnpkg.com/@napi-rs/cli/-/cli-2.2.0.tgz#0129406192c2dfff6e8fc3de0c8be1d2ec286e3f" resolved "https://registry.yarnpkg.com/@napi-rs/cli/-/cli-2.16.1.tgz#912e1169be6ff8bb5e1e22bb702adcc5e73e232b"
integrity sha512-lXOKq0EZWztzHIlpXhKG0Nrv/PDZAl/yBsqQTG0aDfdjGCJudtPgWLR7zzaJoYzkkdFJo0r+teYYzgC+cXB4KQ== integrity sha512-L0Gr5iEQIDEbvWdDr1HUaBOxBSHL1VZhWSk1oryawoT8qJIY+KGfLFelU+Qma64ivCPbxYpkfPoKYVG3rcoGIA==
"@nodelib/fs.scandir@2.1.5": "@nodelib/fs.scandir@2.1.5":
version "2.1.5" version "2.1.5"
@ -3303,11 +3303,16 @@ fn.name@1.x.x:
resolved "https://registry.yarnpkg.com/fn.name/-/fn.name-1.1.0.tgz#26cad8017967aea8731bc42961d04a3d5988accc" resolved "https://registry.yarnpkg.com/fn.name/-/fn.name-1.1.0.tgz#26cad8017967aea8731bc42961d04a3d5988accc"
integrity sha512-GRnmB5gPyJpAhTQdSZTSp9uaPSvl09KoYcMQtsB9rQoOmzs9dH6ffeccH+Z+cv6P68Hu5bC6JjRh4Ah/mHSNRw== integrity sha512-GRnmB5gPyJpAhTQdSZTSp9uaPSvl09KoYcMQtsB9rQoOmzs9dH6ffeccH+Z+cv6P68Hu5bC6JjRh4Ah/mHSNRw==
follow-redirects@^1.14.0, follow-redirects@^1.14.4: follow-redirects@^1.14.0:
version "1.14.8" version "1.14.8"
resolved "https://registry.yarnpkg.com/follow-redirects/-/follow-redirects-1.14.8.tgz#016996fb9a11a100566398b1c6839337d7bfa8fc" resolved "https://registry.yarnpkg.com/follow-redirects/-/follow-redirects-1.14.8.tgz#016996fb9a11a100566398b1c6839337d7bfa8fc"
integrity sha512-1x0S9UVJHsQprFcEC/qnNzBLcIxsjAV905f/UkQxbclCsoTWlacCNOpQa/anodLl2uaEKFhfWOvM2Qg77+15zA== integrity sha512-1x0S9UVJHsQprFcEC/qnNzBLcIxsjAV905f/UkQxbclCsoTWlacCNOpQa/anodLl2uaEKFhfWOvM2Qg77+15zA==
follow-redirects@^1.14.4:
version "1.15.2"
resolved "https://registry.yarnpkg.com/follow-redirects/-/follow-redirects-1.15.2.tgz#b460864144ba63f2681096f274c4e57026da2c13"
integrity sha512-VQLG33o04KaQ8uYi2tVNbdrWp1QWxNNea+nmIB4EVM28v0hmP17z7aG1+wAkNzVq4KeXTq3221ye5qTJP91JwA==
follow-redirects@^1.14.9: follow-redirects@^1.14.9:
version "1.15.1" version "1.15.1"
resolved "https://registry.yarnpkg.com/follow-redirects/-/follow-redirects-1.15.1.tgz#0ca6a452306c9b276e4d3127483e29575e207ad5" resolved "https://registry.yarnpkg.com/follow-redirects/-/follow-redirects-1.15.1.tgz#0ca6a452306c9b276e4d3127483e29575e207ad5"
@ -5145,14 +5150,7 @@ process-on-spawn@^1.0.0:
dependencies: dependencies:
fromentries "^1.2.0" fromentries "^1.2.0"
prom-client@^14.0.1: prom-client@^14.1.0, prom-client@^14.2.0:
version "14.0.1"
resolved "https://registry.yarnpkg.com/prom-client/-/prom-client-14.0.1.tgz#bdd9583e02ec95429677c0e013712d42ef1f86a8"
integrity sha512-HxTArb6fkOntQHoRGvv4qd/BkorjliiuO2uSWC2KC17MUTKYttWdDoXX/vxOhQdkoECEM9BBH0pj2l8G8kev6w==
dependencies:
tdigest "^0.1.1"
prom-client@^14.1.0:
version "14.2.0" version "14.2.0"
resolved "https://registry.yarnpkg.com/prom-client/-/prom-client-14.2.0.tgz#ca94504e64156f6506574c25fb1c34df7812cf11" resolved "https://registry.yarnpkg.com/prom-client/-/prom-client-14.2.0.tgz#ca94504e64156f6506574c25fb1c34df7812cf11"
integrity sha512-sF308EhTenb/pDRPakm+WgiN+VdM/T1RaHj1x+MvAuT8UiQP8JmOEbxVqtkbfR4LrvOg5n7ic01kRBDGXjYikA== integrity sha512-sF308EhTenb/pDRPakm+WgiN+VdM/T1RaHj1x+MvAuT8UiQP8JmOEbxVqtkbfR4LrvOg5n7ic01kRBDGXjYikA==
@ -6027,10 +6025,10 @@ typedarray-to-buffer@^3.1.5:
dependencies: dependencies:
is-typedarray "^1.0.0" is-typedarray "^1.0.0"
typescript@^5.0.4: typescript@^5.1.3:
version "5.0.4" version "5.1.3"
resolved "https://registry.yarnpkg.com/typescript/-/typescript-5.0.4.tgz#b217fd20119bd61a94d4011274e0ab369058da3b" resolved "https://registry.yarnpkg.com/typescript/-/typescript-5.1.3.tgz#8d84219244a6b40b6fb2b33cc1c062f715b9e826"
integrity sha512-cW9T5W9xY37cc+jfEnaUvX91foxtHkza3Nw3wkoF4sSlKn0MONdkdEndig/qPBWXNkmplh3NzayQzCiHM4/hqw== integrity sha512-XH627E9vkeqhlZFQuL+UsyAXEnibT0kWR2FWONlr4sTjvxyJYnyefgrkyECLzM5NenmKzRAy2rR/OlYLA1HkZw==
uc.micro@^1.0.1, uc.micro@^1.0.5: uc.micro@^1.0.1, uc.micro@^1.0.5:
version "1.0.6" version "1.0.6"