mirror of
https://github.com/matrix-org/matrix-hookshot.git
synced 2025-03-10 21:19:13 +00:00
Speed up checking for previous guids.
This commit is contained in:
parent
958bdcb26d
commit
bec670f712
@ -35,8 +35,8 @@ export class MemoryStorageProvider extends MSP implements IBridgeStorageProvider
|
||||
return this.feedGuids.has(url);
|
||||
}
|
||||
|
||||
async hasSeenFeedGuid(url: string, guid: string): Promise<boolean> {
|
||||
return this.feedGuids.get(url)?.includes(guid) ?? false;
|
||||
async hasSeenFeedGuids(url: string, ...guids: string[]): Promise<string[]> {
|
||||
return this.feedGuids.get(url)?.filter((existingGuid) => guids.includes(existingGuid)) ?? [];
|
||||
}
|
||||
|
||||
public async setGithubIssue(repo: string, issueNumber: string, data: IssuesGetResponseData, scope = "") {
|
||||
|
@ -29,11 +29,10 @@ const WIDGET_USER_TOKENS = "widgets.user-tokens.";
|
||||
|
||||
const FEED_GUIDS = "feeds.guids.";
|
||||
|
||||
|
||||
|
||||
const log = new Logger("RedisASProvider");
|
||||
|
||||
export class RedisStorageContextualProvider implements IStorageProvider {
|
||||
|
||||
constructor(protected readonly redis: Redis, protected readonly contextSuffix = '') { }
|
||||
|
||||
public setSyncToken(token: string|null){
|
||||
@ -216,9 +215,9 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme
|
||||
await this.redis.set(key, JSON.stringify(value));
|
||||
}
|
||||
|
||||
public async storeFeedGuids(url: string, ...guid: string[]): Promise<void> {
|
||||
public async storeFeedGuids(url: string, ...guids: string[]): Promise<void> {
|
||||
const feedKey = `${FEED_GUIDS}${url}`;
|
||||
await this.redis.lpush(feedKey, ...guid);
|
||||
await this.redis.lpush(feedKey, ...guids);
|
||||
await this.redis.ltrim(feedKey, 0, MAX_FEED_ITEMS);
|
||||
}
|
||||
|
||||
@ -226,7 +225,16 @@ export class RedisStorageProvider extends RedisStorageContextualProvider impleme
|
||||
return (await this.redis.exists(`${FEED_GUIDS}${url}`)) === 1;
|
||||
}
|
||||
|
||||
public async hasSeenFeedGuid(url: string, guid: string): Promise<boolean> {
|
||||
return (await this.redis.lpos(`${FEED_GUIDS}${url}`, guid)) != null;
|
||||
public async hasSeenFeedGuids(url: string, ...guids: string[]): Promise<string[]> {
|
||||
let multi = this.redis.multi();
|
||||
for (const guid of guids) {
|
||||
multi = multi.lpos(`${FEED_GUIDS}${url}`, guid);
|
||||
}
|
||||
const res = await multi.exec();
|
||||
if (res === null) {
|
||||
// Just assume we've seen none.
|
||||
return [];
|
||||
}
|
||||
return guids.filter((_guid, index) => res[index][1] !== null);
|
||||
}
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ export interface IBridgeStorageProvider extends IAppserviceStorageProvider, ISto
|
||||
setStoredTempFile(key: string, value: string): Promise<void>;
|
||||
getGitlabDiscussionThreads(connectionId: string): Promise<SerializedGitlabDiscussionThreads>;
|
||||
setGitlabDiscussionThreads(connectionId: string, value: SerializedGitlabDiscussionThreads): Promise<void>;
|
||||
storeFeedGuids(url: string, ...guid: string[]): Promise<void>;
|
||||
hasSeenFeed(url: string, ...guid: string[]): Promise<boolean>;
|
||||
hasSeenFeedGuid(url: string, guid: string): Promise<boolean>;
|
||||
storeFeedGuids(url: string, ...guids: string[]): Promise<void>;
|
||||
hasSeenFeed(url: string): Promise<boolean>;
|
||||
hasSeenFeedGuids(url: string, ...guids: string[]): Promise<string[]>;
|
||||
}
|
@ -196,22 +196,20 @@ export class FeedReader {
|
||||
if (feed) {
|
||||
// If undefined, we got a not-modified.
|
||||
log.debug(`Found ${feed.items.length} entries in ${url}`);
|
||||
|
||||
const seenItems = await this.storage.hasSeenFeedGuids(url, ...feed.items.filter(item => !!item.hashId).map(item => item.hashId!))
|
||||
for (const item of feed.items) {
|
||||
// Some feeds have a nasty habit of leading a empty tag there, making us parse it as garbage.
|
||||
if (!item.hashId) {
|
||||
log.error(`Could not determine guid for entry in ${url}, skipping`);
|
||||
continue;
|
||||
}
|
||||
const hashId = `md5:${item.hashId}`;
|
||||
newGuids.push(hashId);
|
||||
|
||||
if (initialSync) {
|
||||
log.debug(`Skipping entry ${item.id ?? hashId} since we're performing an initial sync`);
|
||||
if (seenItems.includes(item.hashId)) {
|
||||
continue;
|
||||
}
|
||||
if (await this.storage.hasSeenFeedGuid(url, hashId)) {
|
||||
log.debug('Skipping already seen entry', item.id ?? hashId);
|
||||
newGuids.push(item.hashId);
|
||||
|
||||
if (initialSync) {
|
||||
log.debug(`Skipping entry ${item.id ?? item.hashId} since we're performing an initial sync`);
|
||||
continue;
|
||||
}
|
||||
const entry = {
|
||||
@ -236,7 +234,6 @@ export class FeedReader {
|
||||
if (seenEntriesChanged && newGuids.length) {
|
||||
await this.storage.storeFeedGuids(url, ...newGuids);
|
||||
}
|
||||
|
||||
}
|
||||
this.queue.push<FeedSuccess>({ eventName: 'feed.success', sender: 'FeedReader', data: { url } });
|
||||
// Clear any feed failures
|
||||
@ -291,7 +288,7 @@ export class FeedReader {
|
||||
log.warn(`It took us longer to update the feeds than the configured pool interval`);
|
||||
}
|
||||
} else {
|
||||
// It may be possible that we have more workers than feeds. This will cause the worker to just sleep.
|
||||
// It is possible that we have more workers than feeds. This will cause the worker to just sleep.
|
||||
log.debug(`No feeds available to poll for worker ${workerId}`);
|
||||
}
|
||||
|
||||
|
@ -70,7 +70,7 @@ fn parse_channel_to_js_result(channel: &Channel) -> JsRssChannel {
|
||||
.map(|f| f.value)
|
||||
.or(item.link.clone())
|
||||
.or(item.title.clone())
|
||||
.and_then(|f| hash_id(f).ok()),
|
||||
.and_then(|f| Some(format!("md5:{:?}", hash_id(f)))),
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user