From f79eb957dea642064763cf3e5a7cf525222365d0 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 24 Feb 2022 11:10:12 +0000 Subject: [PATCH] client: Move sliding sync loop code to sync.js - use callbacks to propagate room data and notices when the response has been processed. --- client/README.md | 2 + client/index.js | 395 ++++++++++++----------------------------------- client/sync.js | 313 ++++++++++++++++++++++++++++++++++++- 3 files changed, 412 insertions(+), 298 deletions(-) diff --git a/client/README.md b/client/README.md index 7c88118..e615b33 100644 --- a/client/README.md +++ b/client/README.md @@ -11,3 +11,5 @@ developers who want to implement Sliding Sync into their clients. The client is - `sync.js` : Sliding Sync code. To understand sliding sync, you need to read `index.js` and `sync.js`. The rest of it can be ignored. + +The client code uses Prettier as a code formatter. diff --git a/client/index.js b/client/index.js index b2f9cb7..24885d1 100644 --- a/client/index.js +++ b/client/index.js @@ -1,12 +1,16 @@ // This file contains the entry point for the client, as well as DOM interactions. -import { SlidingList, SlidingSyncConnection } from "./sync.js"; +import { + SlidingList, + SlidingSyncConnection, + SlidingSync, + LifecycleSyncComplete, + LifecycleSyncRequestFinished, +} from "./sync.js"; import * as render from "./render.js"; import * as devtools from "./devtools.js"; -let activeSessionId; -let activeRoomId = ""; // the room currently being viewed +let slidingSync; let syncConnection = new SlidingSyncConnection(); - let activeLists = [ new SlidingList("Direct Messages", { is_dm: true, @@ -16,15 +20,6 @@ let activeLists = [ }), ]; -const requiredStateEventsInList = [ - ["m.room.avatar", ""], - ["m.room.tombstone", ""], -]; -const requiredStateEventsInRoom = [ - ["m.room.avatar", ""], - ["m.room.topic", ""], -]; - // this is the main data structure the client uses to remember and render rooms. Attach it to // the window to allow easy introspection. let rooms = { @@ -190,9 +185,10 @@ const onRoomClick = (e) => { console.log("failed to find room for onclick"); return; } - // assign global state - activeRoomId = activeLists[listIndex].roomIndexToRoomId[index]; - renderRoomContent(activeRoomId, true); + // assign room subscription + slidingSync.roomSubscription = + activeLists[listIndex].roomIndexToRoomId[index]; + renderRoomContent(slidingSync.roomSubscription, true); // get the highlight on the room const roomListElements = document.getElementsByClassName("roomlist"); for (let i = 0; i < roomListElements.length; i++) { @@ -203,7 +199,7 @@ const onRoomClick = (e) => { }; const renderRoomContent = (roomId, refresh) => { - if (roomId !== activeRoomId) { + if (roomId !== slidingSync.roomSubscription) { return; } const container = document.getElementById("messages"); @@ -214,11 +210,11 @@ const renderRoomContent = (roomId, refresh) => { container.removeChild(container.firstChild); } } - let room = rooms.roomIdToRoom[activeRoomId]; + let room = rooms.roomIdToRoom[slidingSync.roomSubscription]; if (!room) { console.error( "renderRoomContent: unknown active room ID ", - activeRoomId + slidingSync.roomSubscription ); return; } @@ -327,7 +323,7 @@ const renderList = (container, listIndex) => { roomCell.getElementsByClassName("roomavatar")[0].src = "/client/placeholder.svg"; } - if (roomId === activeRoomId) { + if (roomId === slidingSync.roomSubscription) { roomCell.style = "background: #d7d7f7"; } if (r.highlight_count > 0) { @@ -364,282 +360,94 @@ const renderList = (container, listIndex) => { } } }; -const sleep = (ms) => { - return new Promise((resolve) => setTimeout(resolve, ms)); -}; -// SYNC 0 2 a b c; SYNC 6 8 d e f; DELETE 7; INSERT 0 e; -// 0 1 2 3 4 5 6 7 8 -// a b c d e f -// a b c d _ f -// e a b c d f <--- c=3 is wrong as we are not tracking it, ergo we need to see if `i` is in range else drop it -const indexInRange = (listIndex, i) => { - let isInRange = false; - activeLists[listIndex].activeRanges.forEach((r) => { - if (r[0] <= i && i <= r[1]) { - isInRange = true; +const doSyncLoop = async (accessToken) => { + if (slidingSync) { + console.log("Terminating old loop"); + slidingSync.stop(); + } + console.log("Starting sync loop"); + slidingSync = new SlidingSync(activeLists, syncConnection); + slidingSync.addLifecycleListener((state, resp, err) => { + switch (state) { + case LifecycleSyncComplete: + const roomListElements = + document.getElementsByClassName("roomlist"); + for (let i = 0; i < roomListElements.length; i++) { + renderList(roomListElements[i], i); + } + + // check for duplicates and rooms outside tracked ranges which should never happen but can if there's a bug + activeLists.forEach((list, listIndex) => { + let roomIdToPositions = {}; + let dupeRoomIds = new Set(); + let indexesOutsideRanges = new Set(); + Object.keys(list.roomIndexToRoomId).forEach((i) => { + let rid = list.roomIndexToRoomId[i]; + if (!rid) { + return; + } + let positions = roomIdToPositions[rid] || []; + positions.push(i); + roomIdToPositions[rid] = positions; + if (positions.length > 1) { + dupeRoomIds.add(rid); + } + let isInsideRange = false; + list.activeRanges.forEach((r) => { + if (i >= r[0] && i <= r[1]) { + isInsideRange = true; + } + }); + if (!isInsideRange) { + indexesOutsideRanges.add(i); + } + }); + dupeRoomIds.forEach((rid) => { + console.log( + rid, + "in list", + listIndex, + "has duplicate indexes:", + roomIdToPositions[rid] + ); + }); + if (indexesOutsideRanges.size > 0) { + console.log( + "list", + listIndex, + "tracking indexes outside of tracked ranges:", + JSON.stringify([...indexesOutsideRanges]) + ); + } + }); + + devtools.svgify( + document.getElementById("listgraph"), + activeLists, + resp + ); + break; + case LifecycleSyncRequestFinished: + if (err) { + console.error("/sync failed:", err); + document.getElementById("errorMsg").textContent = err; + } else { + document.getElementById("errorMsg").textContent = ""; + } + break; } }); - return isInRange; -}; - -const doSyncLoop = async (accessToken, sessionId) => { - console.log( - "Starting sync loop. Active: ", - activeSessionId, - " this:", - sessionId - ); - - let currentPos; - let currentSub = ""; - while (sessionId === activeSessionId) { - let resp; - try { - // these fields are always required - let reqBody = { - lists: activeLists.map((al) => { - let l = { - ranges: al.activeRanges, - filters: al.getFilters(), - }; - // if this is the first request on this session, send sticky request data which never changes - if (!currentPos) { - l.required_state = requiredStateEventsInList; - l.timeline_limit = 1; - l.sort = [ - "by_highlight_count", - "by_notification_count", - "by_recency", - ]; - } - return l; - }), - }; - // check if we are (un)subscribing to a room and modify request this one time for it - let subscribingToRoom; - if (activeRoomId && currentSub !== activeRoomId) { - if (currentSub) { - reqBody.unsubscribe_rooms = [currentSub]; - } - reqBody.room_subscriptions = { - [activeRoomId]: { - required_state: requiredStateEventsInRoom, - timeline_limit: 30, - }, - }; - // hold a ref to the active room ID as it may change by the time we return from doSyncRequest - subscribingToRoom = activeRoomId; - } - resp = await syncConnection.doSyncRequest( - accessToken, - currentPos, - reqBody - ); - currentPos = resp.pos; - // update what we think we're subscribed to. - if (subscribingToRoom) { - currentSub = subscribingToRoom; - } - if (!resp.ops) { - resp.ops = []; - } - if (resp.counts) { - resp.counts.forEach((count, index) => { - activeLists[index].joinedCount = count; - }); - } - // reset any error message - document.getElementById("errorMsg").textContent = ""; - } catch (err) { - if (err.name !== "AbortError") { - console.error("/sync failed:", err); - document.getElementById("errorMsg").textContent = err; - await sleep(3000); - } - } - if (!resp) { - continue; - } - - Object.keys(resp.room_subscriptions).forEach((roomId) => { - accumulateRoomData( - resp.room_subscriptions[roomId], - rooms.roomIdToRoom[roomId] !== undefined - ); - renderRoomContent(roomId); - }); - - // TODO: clear gapIndex immediately after next op to avoid a genuine DELETE shifting incorrectly e.g leaving a room - let gapIndexes = {}; - resp.counts.forEach((count, index) => { - gapIndexes[index] = -1; - }); - resp.ops.forEach((op) => { - if (op.op === "DELETE") { - console.log("DELETE", op.list, op.index, ";"); - delete activeLists[op.list].roomIndexToRoomId[op.index]; - gapIndexes[op.list] = op.index; - } else if (op.op === "INSERT") { - console.log("INSERT", op.list, op.index, op.room.room_id, ";"); - if (activeLists[op.list].roomIndexToRoomId[op.index]) { - const gapIndex = gapIndexes[op.list]; - // something is in this space, shift items out of the way - if (gapIndex < 0) { - console.log( - "cannot work out where gap is, INSERT without previous DELETE! List: ", - op.list - ); - return; - } - // 0,1,2,3 index - // [A,B,C,D] - // DEL 3 - // [A,B,C,_] - // INSERT E 0 - // [E,A,B,C] - // gapIndex=3, op.index=0 - if (gapIndex > op.index) { - // the gap is further down the list, shift every element to the right - // starting at the gap so we can just shift each element in turn: - // [A,B,C,_] gapIndex=3, op.index=0 - // [A,B,C,C] i=3 - // [A,B,B,C] i=2 - // [A,A,B,C] i=1 - // Terminate. We'll assign into op.index next. - for (let i = gapIndex; i > op.index; i--) { - if (indexInRange(op.list, i)) { - activeLists[op.list].roomIndexToRoomId[i] = - activeLists[op.list].roomIndexToRoomId[ - i - 1 - ]; - } - } - } else if (gapIndex < op.index) { - // the gap is further up the list, shift every element to the left - // starting at the gap so we can just shift each element in turn - for (let i = gapIndex; i < op.index; i++) { - if (indexInRange(op.list, i)) { - activeLists[op.list].roomIndexToRoomId[i] = - activeLists[op.list].roomIndexToRoomId[ - i + 1 - ]; - } - } - } - } - accumulateRoomData( - op.room, - rooms.roomIdToRoom[op.room.room_id] !== undefined - ); - activeLists[op.list].roomIndexToRoomId[op.index] = - op.room.room_id; - renderRoomContent(op.room.room_id); - } else if (op.op === "UPDATE") { - console.log("UPDATE", op.list, op.index, op.room.room_id, ";"); - accumulateRoomData(op.room, true); - renderRoomContent(op.room.room_id); - } else if (op.op === "SYNC") { - let syncRooms = []; - const startIndex = op.range[0]; - for (let i = startIndex; i <= op.range[1]; i++) { - const r = op.rooms[i - startIndex]; - if (!r) { - break; // we are at the end of list - } - activeLists[op.list].roomIndexToRoomId[i] = r.room_id; - syncRooms.push(r.room_id); - accumulateRoomData(r); - } - console.log( - "SYNC", - op.list, - op.range[0], - op.range[1], - syncRooms.join(" "), - ";" - ); - } else if (op.op === "INVALIDATE") { - let invalidRooms = []; - const startIndex = op.range[0]; - for (let i = startIndex; i <= op.range[1]; i++) { - invalidRooms.push( - activeLists[op.list].roomIndexToRoomId[i] - ); - delete activeLists[op.list].roomIndexToRoomId[i]; - } - console.log( - "INVALIDATE", - op.list, - op.range[0], - op.range[1], - ";" - ); - } - }); - const roomListElements = document.getElementsByClassName("roomlist"); - for (let i = 0; i < roomListElements.length; i++) { - renderList(roomListElements[i], i); - } - - // check for duplicates and rooms outside tracked ranges which should never happen but can if there's a bug - activeLists.forEach((list, listIndex) => { - let roomIdToPositions = {}; - let dupeRoomIds = new Set(); - let indexesOutsideRanges = new Set(); - Object.keys(list.roomIndexToRoomId).forEach((i) => { - let rid = list.roomIndexToRoomId[i]; - if (!rid) { - return; - } - let positions = roomIdToPositions[rid] || []; - positions.push(i); - roomIdToPositions[rid] = positions; - if (positions.length > 1) { - dupeRoomIds.add(rid); - } - let isInsideRange = false; - list.activeRanges.forEach((r) => { - if (i >= r[0] && i <= r[1]) { - isInsideRange = true; - } - }); - if (!isInsideRange) { - indexesOutsideRanges.add(i); - } - }); - dupeRoomIds.forEach((rid) => { - console.log( - rid, - "in list", - listIndex, - "has duplicate indexes:", - roomIdToPositions[rid] - ); - }); - if (indexesOutsideRanges.size > 0) { - console.log( - "list", - listIndex, - "tracking indexes outside of tracked ranges:", - JSON.stringify([...indexesOutsideRanges]) - ); - } - }); - - devtools.svgify( - document.getElementById("listgraph"), - activeLists, - resp + slidingSync.addRoomDataListener((roomId, roomData, isIncremental) => { + accumulateRoomData( + roomData, + isIncremental + ? isIncremental + : rooms.roomIdToRoom[roomId] !== undefined ); - } - console.log( - "active session: ", - activeSessionId, - " this session: ", - sessionId, - " terminating." - ); + renderRoomContent(roomId); + }); + slidingSync.start(accessToken); }; const randomName = (i, long) => { @@ -702,8 +510,7 @@ window.addEventListener("load", (event) => { document.getElementById("syncButton").onclick = () => { const accessToken = document.getElementById("accessToken").value; window.localStorage.setItem("accessToken", accessToken); - activeSessionId = new Date().getTime() + ""; - doSyncLoop(accessToken, activeSessionId); + doSyncLoop(accessToken); }; document.getElementById("roomfilter").addEventListener("input", (ev) => { const roomNameFilter = ev.target.value; diff --git a/client/sync.js b/client/sync.js index 15f99f8..dae17fc 100644 --- a/client/sync.js +++ b/client/sync.js @@ -7,6 +7,22 @@ import * as devtools from "./devtools.js"; // TODO: explain why const DEFAULT_RANGES = [[0, 20]]; +const REQUIRED_STATE_EVENTS_IN_LIST = [ + ["m.room.avatar", ""], + ["m.room.tombstone", ""], +]; +const REQUIRED_STATE_EVENTS_IN_ROOM = [ + ["m.room.avatar", ""], + ["m.room.topic", ""], +]; + +// Lifecycle state when the /sync response has been fully processed and all room data callbacks +// have been invoked. Never contains an error. +export const LifecycleSyncComplete = 1; +// Lifecycle state when the /sync request has returned. May include an error if there was a problem +// talking to the server. +export const LifecycleSyncRequestFinished = 2; + /** * SlidingSyncConnection is a thin wrapper around fetch() which performs a sliding sync request. * The wrapper persists a small amount of extra data including the total number of tx/rx bytes, @@ -122,10 +138,299 @@ export class SlidingList { } } -class SlidingSync { +/** + * SlidingSync is a high-level data structure which controls the majority of sliding sync. + */ +export class SlidingSync { /** - * - * @param {[]SlidingList} activeLists + * Create a new sliding sync instance + * @param {[]SlidingList} lists The lists to use for sliding sync. + * @param {SlidingSyncConnection} conn The connection to use for /sync calls. */ - constructor(activeLists) {} + constructor(lists, conn) { + this.lists = lists; + this.conn = conn; + this.terminated = false; + this.roomSubscription = ""; + this.roomDataCallbacks = []; + this.lifecycleCallbacks = []; + } + + /** + * Listen for high-level room events on the sync connection + * @param {function} callback The callback to invoke. + */ + addRoomDataListener(callback) { + this.roomDataCallbacks.push(callback); + } + + /** + * Listen for high-level lifecycle events on the sync connection + * @param {function} callback The callback to invoke. + */ + addLifecycleListener(callback) { + this.lifecycleCallbacks.push(callback); + } + + /** + * Invoke all attached room data listeners. + * @param {string} roomId The room which received some data. + * @param {object} roomData The raw sliding sync response JSON. + * @param {boolean} isIncremental True if the roomData is a delta. False if this is the complete snapshot. + */ + _invokeRoomDataListeners(roomId, roomData, isIncremental) { + this.roomDataCallbacks.forEach((callback) => { + callback(roomId, roomData, isIncremental); + }); + } + + _invokeLifecycleListeners(state, resp, err) { + this.lifecycleCallbacks.forEach((callback) => { + callback(state, resp, err); + }); + } + + /** + * Stop syncing with the server. + */ + stop() { + this.terminated = true; + this.conn.abort(); + } + + /** + * Start syncing with the server. Blocks until stopped. + * @param {string} accessToken The access token to sync with. + */ + async start(accessToken) { + let currentPos; + let currentSub = ""; + while (!this.terminated) { + let resp; + try { + // these fields are always required + let reqBody = { + lists: this.lists.map((al) => { + let l = { + ranges: al.activeRanges, + filters: al.getFilters(), + }; + // if this is the first request on this session, send sticky request data which never changes + if (!currentPos) { + l.required_state = REQUIRED_STATE_EVENTS_IN_LIST; + l.timeline_limit = 1; + l.sort = [ + "by_highlight_count", + "by_notification_count", + "by_recency", + ]; + } + return l; + }), + }; + // check if we are (un)subscribing to a room and modify request this one time for it + let subscribingToRoom; + if ( + this.roomSubscription && + currentSub !== this.roomSubscription + ) { + if (currentSub) { + reqBody.unsubscribe_rooms = [currentSub]; + } + reqBody.room_subscriptions = { + [this.roomSubscription]: { + required_state: REQUIRED_STATE_EVENTS_IN_ROOM, + timeline_limit: 30, + }, + }; + // hold a ref to the active room ID as it may change by the time we return from doSyncRequest + subscribingToRoom = this.roomSubscription; + } + resp = await this.conn.doSyncRequest( + accessToken, + currentPos, + reqBody + ); + currentPos = resp.pos; + // update what we think we're subscribed to. + if (subscribingToRoom) { + currentSub = subscribingToRoom; + } + if (!resp.ops) { + resp.ops = []; + } + if (resp.counts) { + resp.counts.forEach((count, index) => { + this.lists[index].joinedCount = count; + }); + } + this._invokeLifecycleListeners( + LifecycleSyncRequestFinished, + resp + ); + } catch (err) { + if (err.name !== "AbortError") { + // XXX: request failed + this._invokeLifecycleListeners( + LifecycleSyncRequestFinished, + null, + err + ); + await sleep(3000); + } + } + if (!resp) { + continue; + } + + Object.keys(resp.room_subscriptions).forEach((roomId) => { + this._invokeRoomDataListeners( + roomId, + resp.room_subscriptions[roomId] + ); + }); + + // TODO: clear gapIndex immediately after next op to avoid a genuine DELETE shifting incorrectly e.g leaving a room + let gapIndexes = {}; + resp.counts.forEach((count, index) => { + gapIndexes[index] = -1; + }); + resp.ops.forEach((op) => { + if (op.op === "DELETE") { + console.log("DELETE", op.list, op.index, ";"); + delete this.lists[op.list].roomIndexToRoomId[op.index]; + gapIndexes[op.list] = op.index; + } else if (op.op === "INSERT") { + console.log( + "INSERT", + op.list, + op.index, + op.room.room_id, + ";" + ); + if (this.lists[op.list].roomIndexToRoomId[op.index]) { + const gapIndex = gapIndexes[op.list]; + // something is in this space, shift items out of the way + if (gapIndex < 0) { + console.log( + "cannot work out where gap is, INSERT without previous DELETE! List: ", + op.list + ); + return; + } + // 0,1,2,3 index + // [A,B,C,D] + // DEL 3 + // [A,B,C,_] + // INSERT E 0 + // [E,A,B,C] + // gapIndex=3, op.index=0 + if (gapIndex > op.index) { + // the gap is further down the list, shift every element to the right + // starting at the gap so we can just shift each element in turn: + // [A,B,C,_] gapIndex=3, op.index=0 + // [A,B,C,C] i=3 + // [A,B,B,C] i=2 + // [A,A,B,C] i=1 + // Terminate. We'll assign into op.index next. + for (let i = gapIndex; i > op.index; i--) { + if (indexInRange(op.list, i)) { + this.lists[op.list].roomIndexToRoomId[i] = + this.lists[op.list].roomIndexToRoomId[ + i - 1 + ]; + } + } + } else if (gapIndex < op.index) { + // the gap is further up the list, shift every element to the left + // starting at the gap so we can just shift each element in turn + for (let i = gapIndex; i < op.index; i++) { + if (indexInRange(op.list, i)) { + this.lists[op.list].roomIndexToRoomId[i] = + this.lists[op.list].roomIndexToRoomId[ + i + 1 + ]; + } + } + } + } + this.lists[op.list].roomIndexToRoomId[op.index] = + op.room.room_id; + this._invokeRoomDataListeners(op.room.room_id, op.room); + } else if (op.op === "UPDATE") { + console.log( + "UPDATE", + op.list, + op.index, + op.room.room_id, + ";" + ); + // TODO: move + // XXX: room data, room ID + this._invokeRoomDataListeners( + op.room.room_id, + op.room, + true + ); + } else if (op.op === "SYNC") { + let syncRooms = []; + const startIndex = op.range[0]; + for (let i = startIndex; i <= op.range[1]; i++) { + const r = op.rooms[i - startIndex]; + if (!r) { + break; // we are at the end of list + } + this.lists[op.list].roomIndexToRoomId[i] = r.room_id; + syncRooms.push(r.room_id); + this._invokeRoomDataListeners(r.room_id, r); + } + console.log( + "SYNC", + op.list, + op.range[0], + op.range[1], + syncRooms.join(" "), + ";" + ); + } else if (op.op === "INVALIDATE") { + let invalidRooms = []; + const startIndex = op.range[0]; + for (let i = startIndex; i <= op.range[1]; i++) { + invalidRooms.push( + this.lists[op.list].roomIndexToRoomId[i] + ); + delete this.lists[op.list].roomIndexToRoomId[i]; + } + console.log( + "INVALIDATE", + op.list, + op.range[0], + op.range[1], + ";" + ); + } + }); + + this._invokeLifecycleListeners(LifecycleSyncComplete, resp); + } + } } + +const sleep = (ms) => { + return new Promise((resolve) => setTimeout(resolve, ms)); +}; + +// SYNC 0 2 a b c; SYNC 6 8 d e f; DELETE 7; INSERT 0 e; +// 0 1 2 3 4 5 6 7 8 +// a b c d e f +// a b c d _ f +// e a b c d f <--- c=3 is wrong as we are not tracking it, ergo we need to see if `i` is in range else drop it +const indexInRange = (listIndex, i) => { + let isInRange = false; + activeLists[listIndex].activeRanges.forEach((r) => { + if (r[0] <= i && i <= r[1]) { + isInRange = true; + } + }); + return isInRange; +};