client: Move sliding sync loop code to sync.js

- use callbacks to propagate room data and notices when the response has been processed.
This commit is contained in:
Kegan Dougal 2022-02-24 11:10:12 +00:00
parent 808d7f8d21
commit f79eb957de
3 changed files with 412 additions and 298 deletions

View File

@ -11,3 +11,5 @@ developers who want to implement Sliding Sync into their clients. The client is
- `sync.js` : Sliding Sync code.
To understand sliding sync, you need to read `index.js` and `sync.js`. The rest of it can be ignored.
The client code uses Prettier as a code formatter.

View File

@ -1,12 +1,16 @@
// This file contains the entry point for the client, as well as DOM interactions.
import { SlidingList, SlidingSyncConnection } from "./sync.js";
import {
SlidingList,
SlidingSyncConnection,
SlidingSync,
LifecycleSyncComplete,
LifecycleSyncRequestFinished,
} from "./sync.js";
import * as render from "./render.js";
import * as devtools from "./devtools.js";
let activeSessionId;
let activeRoomId = ""; // the room currently being viewed
let slidingSync;
let syncConnection = new SlidingSyncConnection();
let activeLists = [
new SlidingList("Direct Messages", {
is_dm: true,
@ -16,15 +20,6 @@ let activeLists = [
}),
];
const requiredStateEventsInList = [
["m.room.avatar", ""],
["m.room.tombstone", ""],
];
const requiredStateEventsInRoom = [
["m.room.avatar", ""],
["m.room.topic", ""],
];
// this is the main data structure the client uses to remember and render rooms. Attach it to
// the window to allow easy introspection.
let rooms = {
@ -190,9 +185,10 @@ const onRoomClick = (e) => {
console.log("failed to find room for onclick");
return;
}
// assign global state
activeRoomId = activeLists[listIndex].roomIndexToRoomId[index];
renderRoomContent(activeRoomId, true);
// assign room subscription
slidingSync.roomSubscription =
activeLists[listIndex].roomIndexToRoomId[index];
renderRoomContent(slidingSync.roomSubscription, true);
// get the highlight on the room
const roomListElements = document.getElementsByClassName("roomlist");
for (let i = 0; i < roomListElements.length; i++) {
@ -203,7 +199,7 @@ const onRoomClick = (e) => {
};
const renderRoomContent = (roomId, refresh) => {
if (roomId !== activeRoomId) {
if (roomId !== slidingSync.roomSubscription) {
return;
}
const container = document.getElementById("messages");
@ -214,11 +210,11 @@ const renderRoomContent = (roomId, refresh) => {
container.removeChild(container.firstChild);
}
}
let room = rooms.roomIdToRoom[activeRoomId];
let room = rooms.roomIdToRoom[slidingSync.roomSubscription];
if (!room) {
console.error(
"renderRoomContent: unknown active room ID ",
activeRoomId
slidingSync.roomSubscription
);
return;
}
@ -327,7 +323,7 @@ const renderList = (container, listIndex) => {
roomCell.getElementsByClassName("roomavatar")[0].src =
"/client/placeholder.svg";
}
if (roomId === activeRoomId) {
if (roomId === slidingSync.roomSubscription) {
roomCell.style = "background: #d7d7f7";
}
if (r.highlight_count > 0) {
@ -364,220 +360,19 @@ const renderList = (container, listIndex) => {
}
}
};
const sleep = (ms) => {
return new Promise((resolve) => setTimeout(resolve, ms));
};
// SYNC 0 2 a b c; SYNC 6 8 d e f; DELETE 7; INSERT 0 e;
// 0 1 2 3 4 5 6 7 8
// a b c d e f
// a b c d _ f
// e a b c d f <--- c=3 is wrong as we are not tracking it, ergo we need to see if `i` is in range else drop it
const indexInRange = (listIndex, i) => {
let isInRange = false;
activeLists[listIndex].activeRanges.forEach((r) => {
if (r[0] <= i && i <= r[1]) {
isInRange = true;
const doSyncLoop = async (accessToken) => {
if (slidingSync) {
console.log("Terminating old loop");
slidingSync.stop();
}
});
return isInRange;
};
const doSyncLoop = async (accessToken, sessionId) => {
console.log(
"Starting sync loop. Active: ",
activeSessionId,
" this:",
sessionId
);
let currentPos;
let currentSub = "";
while (sessionId === activeSessionId) {
let resp;
try {
// these fields are always required
let reqBody = {
lists: activeLists.map((al) => {
let l = {
ranges: al.activeRanges,
filters: al.getFilters(),
};
// if this is the first request on this session, send sticky request data which never changes
if (!currentPos) {
l.required_state = requiredStateEventsInList;
l.timeline_limit = 1;
l.sort = [
"by_highlight_count",
"by_notification_count",
"by_recency",
];
}
return l;
}),
};
// check if we are (un)subscribing to a room and modify request this one time for it
let subscribingToRoom;
if (activeRoomId && currentSub !== activeRoomId) {
if (currentSub) {
reqBody.unsubscribe_rooms = [currentSub];
}
reqBody.room_subscriptions = {
[activeRoomId]: {
required_state: requiredStateEventsInRoom,
timeline_limit: 30,
},
};
// hold a ref to the active room ID as it may change by the time we return from doSyncRequest
subscribingToRoom = activeRoomId;
}
resp = await syncConnection.doSyncRequest(
accessToken,
currentPos,
reqBody
);
currentPos = resp.pos;
// update what we think we're subscribed to.
if (subscribingToRoom) {
currentSub = subscribingToRoom;
}
if (!resp.ops) {
resp.ops = [];
}
if (resp.counts) {
resp.counts.forEach((count, index) => {
activeLists[index].joinedCount = count;
});
}
// reset any error message
document.getElementById("errorMsg").textContent = "";
} catch (err) {
if (err.name !== "AbortError") {
console.error("/sync failed:", err);
document.getElementById("errorMsg").textContent = err;
await sleep(3000);
}
}
if (!resp) {
continue;
}
Object.keys(resp.room_subscriptions).forEach((roomId) => {
accumulateRoomData(
resp.room_subscriptions[roomId],
rooms.roomIdToRoom[roomId] !== undefined
);
renderRoomContent(roomId);
});
// TODO: clear gapIndex immediately after next op to avoid a genuine DELETE shifting incorrectly e.g leaving a room
let gapIndexes = {};
resp.counts.forEach((count, index) => {
gapIndexes[index] = -1;
});
resp.ops.forEach((op) => {
if (op.op === "DELETE") {
console.log("DELETE", op.list, op.index, ";");
delete activeLists[op.list].roomIndexToRoomId[op.index];
gapIndexes[op.list] = op.index;
} else if (op.op === "INSERT") {
console.log("INSERT", op.list, op.index, op.room.room_id, ";");
if (activeLists[op.list].roomIndexToRoomId[op.index]) {
const gapIndex = gapIndexes[op.list];
// something is in this space, shift items out of the way
if (gapIndex < 0) {
console.log(
"cannot work out where gap is, INSERT without previous DELETE! List: ",
op.list
);
return;
}
// 0,1,2,3 index
// [A,B,C,D]
// DEL 3
// [A,B,C,_]
// INSERT E 0
// [E,A,B,C]
// gapIndex=3, op.index=0
if (gapIndex > op.index) {
// the gap is further down the list, shift every element to the right
// starting at the gap so we can just shift each element in turn:
// [A,B,C,_] gapIndex=3, op.index=0
// [A,B,C,C] i=3
// [A,B,B,C] i=2
// [A,A,B,C] i=1
// Terminate. We'll assign into op.index next.
for (let i = gapIndex; i > op.index; i--) {
if (indexInRange(op.list, i)) {
activeLists[op.list].roomIndexToRoomId[i] =
activeLists[op.list].roomIndexToRoomId[
i - 1
];
}
}
} else if (gapIndex < op.index) {
// the gap is further up the list, shift every element to the left
// starting at the gap so we can just shift each element in turn
for (let i = gapIndex; i < op.index; i++) {
if (indexInRange(op.list, i)) {
activeLists[op.list].roomIndexToRoomId[i] =
activeLists[op.list].roomIndexToRoomId[
i + 1
];
}
}
}
}
accumulateRoomData(
op.room,
rooms.roomIdToRoom[op.room.room_id] !== undefined
);
activeLists[op.list].roomIndexToRoomId[op.index] =
op.room.room_id;
renderRoomContent(op.room.room_id);
} else if (op.op === "UPDATE") {
console.log("UPDATE", op.list, op.index, op.room.room_id, ";");
accumulateRoomData(op.room, true);
renderRoomContent(op.room.room_id);
} else if (op.op === "SYNC") {
let syncRooms = [];
const startIndex = op.range[0];
for (let i = startIndex; i <= op.range[1]; i++) {
const r = op.rooms[i - startIndex];
if (!r) {
break; // we are at the end of list
}
activeLists[op.list].roomIndexToRoomId[i] = r.room_id;
syncRooms.push(r.room_id);
accumulateRoomData(r);
}
console.log(
"SYNC",
op.list,
op.range[0],
op.range[1],
syncRooms.join(" "),
";"
);
} else if (op.op === "INVALIDATE") {
let invalidRooms = [];
const startIndex = op.range[0];
for (let i = startIndex; i <= op.range[1]; i++) {
invalidRooms.push(
activeLists[op.list].roomIndexToRoomId[i]
);
delete activeLists[op.list].roomIndexToRoomId[i];
}
console.log(
"INVALIDATE",
op.list,
op.range[0],
op.range[1],
";"
);
}
});
const roomListElements = document.getElementsByClassName("roomlist");
console.log("Starting sync loop");
slidingSync = new SlidingSync(activeLists, syncConnection);
slidingSync.addLifecycleListener((state, resp, err) => {
switch (state) {
case LifecycleSyncComplete:
const roomListElements =
document.getElementsByClassName("roomlist");
for (let i = 0; i < roomListElements.length; i++) {
renderList(roomListElements[i], i);
}
@ -632,14 +427,27 @@ const doSyncLoop = async (accessToken, sessionId) => {
activeLists,
resp
);
break;
case LifecycleSyncRequestFinished:
if (err) {
console.error("/sync failed:", err);
document.getElementById("errorMsg").textContent = err;
} else {
document.getElementById("errorMsg").textContent = "";
}
console.log(
"active session: ",
activeSessionId,
" this session: ",
sessionId,
" terminating."
break;
}
});
slidingSync.addRoomDataListener((roomId, roomData, isIncremental) => {
accumulateRoomData(
roomData,
isIncremental
? isIncremental
: rooms.roomIdToRoom[roomId] !== undefined
);
renderRoomContent(roomId);
});
slidingSync.start(accessToken);
};
const randomName = (i, long) => {
@ -702,8 +510,7 @@ window.addEventListener("load", (event) => {
document.getElementById("syncButton").onclick = () => {
const accessToken = document.getElementById("accessToken").value;
window.localStorage.setItem("accessToken", accessToken);
activeSessionId = new Date().getTime() + "";
doSyncLoop(accessToken, activeSessionId);
doSyncLoop(accessToken);
};
document.getElementById("roomfilter").addEventListener("input", (ev) => {
const roomNameFilter = ev.target.value;

View File

@ -7,6 +7,22 @@ import * as devtools from "./devtools.js";
// TODO: explain why
const DEFAULT_RANGES = [[0, 20]];
const REQUIRED_STATE_EVENTS_IN_LIST = [
["m.room.avatar", ""],
["m.room.tombstone", ""],
];
const REQUIRED_STATE_EVENTS_IN_ROOM = [
["m.room.avatar", ""],
["m.room.topic", ""],
];
// Lifecycle state when the /sync response has been fully processed and all room data callbacks
// have been invoked. Never contains an error.
export const LifecycleSyncComplete = 1;
// Lifecycle state when the /sync request has returned. May include an error if there was a problem
// talking to the server.
export const LifecycleSyncRequestFinished = 2;
/**
* SlidingSyncConnection is a thin wrapper around fetch() which performs a sliding sync request.
* The wrapper persists a small amount of extra data including the total number of tx/rx bytes,
@ -122,10 +138,299 @@ export class SlidingList {
}
}
class SlidingSync {
/**
*
* @param {[]SlidingList} activeLists
* SlidingSync is a high-level data structure which controls the majority of sliding sync.
*/
constructor(activeLists) {}
export class SlidingSync {
/**
* Create a new sliding sync instance
* @param {[]SlidingList} lists The lists to use for sliding sync.
* @param {SlidingSyncConnection} conn The connection to use for /sync calls.
*/
constructor(lists, conn) {
this.lists = lists;
this.conn = conn;
this.terminated = false;
this.roomSubscription = "";
this.roomDataCallbacks = [];
this.lifecycleCallbacks = [];
}
/**
* Listen for high-level room events on the sync connection
* @param {function} callback The callback to invoke.
*/
addRoomDataListener(callback) {
this.roomDataCallbacks.push(callback);
}
/**
* Listen for high-level lifecycle events on the sync connection
* @param {function} callback The callback to invoke.
*/
addLifecycleListener(callback) {
this.lifecycleCallbacks.push(callback);
}
/**
* Invoke all attached room data listeners.
* @param {string} roomId The room which received some data.
* @param {object} roomData The raw sliding sync response JSON.
* @param {boolean} isIncremental True if the roomData is a delta. False if this is the complete snapshot.
*/
_invokeRoomDataListeners(roomId, roomData, isIncremental) {
this.roomDataCallbacks.forEach((callback) => {
callback(roomId, roomData, isIncremental);
});
}
_invokeLifecycleListeners(state, resp, err) {
this.lifecycleCallbacks.forEach((callback) => {
callback(state, resp, err);
});
}
/**
* Stop syncing with the server.
*/
stop() {
this.terminated = true;
this.conn.abort();
}
/**
* Start syncing with the server. Blocks until stopped.
* @param {string} accessToken The access token to sync with.
*/
async start(accessToken) {
let currentPos;
let currentSub = "";
while (!this.terminated) {
let resp;
try {
// these fields are always required
let reqBody = {
lists: this.lists.map((al) => {
let l = {
ranges: al.activeRanges,
filters: al.getFilters(),
};
// if this is the first request on this session, send sticky request data which never changes
if (!currentPos) {
l.required_state = REQUIRED_STATE_EVENTS_IN_LIST;
l.timeline_limit = 1;
l.sort = [
"by_highlight_count",
"by_notification_count",
"by_recency",
];
}
return l;
}),
};
// check if we are (un)subscribing to a room and modify request this one time for it
let subscribingToRoom;
if (
this.roomSubscription &&
currentSub !== this.roomSubscription
) {
if (currentSub) {
reqBody.unsubscribe_rooms = [currentSub];
}
reqBody.room_subscriptions = {
[this.roomSubscription]: {
required_state: REQUIRED_STATE_EVENTS_IN_ROOM,
timeline_limit: 30,
},
};
// hold a ref to the active room ID as it may change by the time we return from doSyncRequest
subscribingToRoom = this.roomSubscription;
}
resp = await this.conn.doSyncRequest(
accessToken,
currentPos,
reqBody
);
currentPos = resp.pos;
// update what we think we're subscribed to.
if (subscribingToRoom) {
currentSub = subscribingToRoom;
}
if (!resp.ops) {
resp.ops = [];
}
if (resp.counts) {
resp.counts.forEach((count, index) => {
this.lists[index].joinedCount = count;
});
}
this._invokeLifecycleListeners(
LifecycleSyncRequestFinished,
resp
);
} catch (err) {
if (err.name !== "AbortError") {
// XXX: request failed
this._invokeLifecycleListeners(
LifecycleSyncRequestFinished,
null,
err
);
await sleep(3000);
}
}
if (!resp) {
continue;
}
Object.keys(resp.room_subscriptions).forEach((roomId) => {
this._invokeRoomDataListeners(
roomId,
resp.room_subscriptions[roomId]
);
});
// TODO: clear gapIndex immediately after next op to avoid a genuine DELETE shifting incorrectly e.g leaving a room
let gapIndexes = {};
resp.counts.forEach((count, index) => {
gapIndexes[index] = -1;
});
resp.ops.forEach((op) => {
if (op.op === "DELETE") {
console.log("DELETE", op.list, op.index, ";");
delete this.lists[op.list].roomIndexToRoomId[op.index];
gapIndexes[op.list] = op.index;
} else if (op.op === "INSERT") {
console.log(
"INSERT",
op.list,
op.index,
op.room.room_id,
";"
);
if (this.lists[op.list].roomIndexToRoomId[op.index]) {
const gapIndex = gapIndexes[op.list];
// something is in this space, shift items out of the way
if (gapIndex < 0) {
console.log(
"cannot work out where gap is, INSERT without previous DELETE! List: ",
op.list
);
return;
}
// 0,1,2,3 index
// [A,B,C,D]
// DEL 3
// [A,B,C,_]
// INSERT E 0
// [E,A,B,C]
// gapIndex=3, op.index=0
if (gapIndex > op.index) {
// the gap is further down the list, shift every element to the right
// starting at the gap so we can just shift each element in turn:
// [A,B,C,_] gapIndex=3, op.index=0
// [A,B,C,C] i=3
// [A,B,B,C] i=2
// [A,A,B,C] i=1
// Terminate. We'll assign into op.index next.
for (let i = gapIndex; i > op.index; i--) {
if (indexInRange(op.list, i)) {
this.lists[op.list].roomIndexToRoomId[i] =
this.lists[op.list].roomIndexToRoomId[
i - 1
];
}
}
} else if (gapIndex < op.index) {
// the gap is further up the list, shift every element to the left
// starting at the gap so we can just shift each element in turn
for (let i = gapIndex; i < op.index; i++) {
if (indexInRange(op.list, i)) {
this.lists[op.list].roomIndexToRoomId[i] =
this.lists[op.list].roomIndexToRoomId[
i + 1
];
}
}
}
}
this.lists[op.list].roomIndexToRoomId[op.index] =
op.room.room_id;
this._invokeRoomDataListeners(op.room.room_id, op.room);
} else if (op.op === "UPDATE") {
console.log(
"UPDATE",
op.list,
op.index,
op.room.room_id,
";"
);
// TODO: move
// XXX: room data, room ID
this._invokeRoomDataListeners(
op.room.room_id,
op.room,
true
);
} else if (op.op === "SYNC") {
let syncRooms = [];
const startIndex = op.range[0];
for (let i = startIndex; i <= op.range[1]; i++) {
const r = op.rooms[i - startIndex];
if (!r) {
break; // we are at the end of list
}
this.lists[op.list].roomIndexToRoomId[i] = r.room_id;
syncRooms.push(r.room_id);
this._invokeRoomDataListeners(r.room_id, r);
}
console.log(
"SYNC",
op.list,
op.range[0],
op.range[1],
syncRooms.join(" "),
";"
);
} else if (op.op === "INVALIDATE") {
let invalidRooms = [];
const startIndex = op.range[0];
for (let i = startIndex; i <= op.range[1]; i++) {
invalidRooms.push(
this.lists[op.list].roomIndexToRoomId[i]
);
delete this.lists[op.list].roomIndexToRoomId[i];
}
console.log(
"INVALIDATE",
op.list,
op.range[0],
op.range[1],
";"
);
}
});
this._invokeLifecycleListeners(LifecycleSyncComplete, resp);
}
}
}
const sleep = (ms) => {
return new Promise((resolve) => setTimeout(resolve, ms));
};
// SYNC 0 2 a b c; SYNC 6 8 d e f; DELETE 7; INSERT 0 e;
// 0 1 2 3 4 5 6 7 8
// a b c d e f
// a b c d _ f
// e a b c d f <--- c=3 is wrong as we are not tracking it, ergo we need to see if `i` is in range else drop it
const indexInRange = (listIndex, i) => {
let isInRange = false;
activeLists[listIndex].activeRanges.forEach((r) => {
if (r[0] <= i && i <= r[1]) {
isInRange = true;
}
});
return isInRange;
};