Add stream IDs to typing

This commit is contained in:
Kegan Dougal 2021-06-10 17:08:06 +01:00
parent 4e91518b7c
commit b7c8a66203
4 changed files with 35 additions and 17 deletions

View File

@ -325,12 +325,13 @@ func (a *Accumulator) Delta(roomID string, lastEventNID int64, limit int) (event
return eventsJSON, int64(events[len(events)-1].NID), nil
}
// Typing returns who is currently typing in this room
func (a *Accumulator) Typing(roomID string) ([]string, error) {
return a.typingTable.Typing(roomID)
// Typing returns who is currently typing in this room along with the latest stream ID.
func (a *Accumulator) Typing(roomID string, streamID int) ([]string, int, error) {
return a.typingTable.Typing(roomID, streamID)
}
// SetTyping sets who is typing in the room. An empty list removes all typing users.
func (a *Accumulator) SetTyping(roomID string, userIDs []string) error {
// SetTyping sets who is typing in the room. An empty list removes all typing users. Returns the
// stream ID of the newly inserted typing users.
func (a *Accumulator) SetTyping(roomID string, userIDs []string) (int, error) {
return a.typingTable.SetTyping(roomID, userIDs)
}

View File

@ -17,7 +17,9 @@ type TypingTable struct {
func NewTypingTable(db *sqlx.DB) *TypingTable {
// make sure tables are made
db.MustExec(`
CREATE SEQUENCE IF NOT EXISTS syncv3_typing_seq;
CREATE TABLE IF NOT EXISTS syncv3_typing (
stream_id BIGINT NOT NULL DEFAULT nextval('syncv3_typing_seq'),
room_id TEXT NOT NULL PRIMARY KEY,
user_ids TEXT[] NOT NULL
);
@ -25,21 +27,25 @@ func NewTypingTable(db *sqlx.DB) *TypingTable {
return &TypingTable{db}
}
func (t *TypingTable) SetTyping(roomID string, userIDs []string) (err error) {
func (t *TypingTable) SetTyping(roomID string, userIDs []string) (streamID int, err error) {
if userIDs == nil {
userIDs = []string{}
}
_, err = t.db.Exec(`
INSERT INTO syncv3_typing(room_id, user_ids) VALUES($1, $2)
ON CONFLICT (room_id) DO UPDATE SET user_ids = $2`, roomID, pq.Array(userIDs))
return err
err = t.db.QueryRow(`
INSERT INTO syncv3_typing(room_id, user_ids) VALUES($1, $2)
ON CONFLICT (room_id) DO UPDATE SET user_ids = $2, stream_id = nextval('syncv3_typing_seq') RETURNING stream_id`,
roomID, pq.Array(userIDs),
).Scan(&streamID)
return streamID, err
}
func (t *TypingTable) Typing(roomID string) (userIDs []string, err error) {
func (t *TypingTable) Typing(roomID string, streamID int) (userIDs []string, newStreamID int, err error) {
var userIDsArray pq.StringArray
err = t.db.QueryRow(`SELECT user_ids FROM syncv3_typing WHERE room_id=$1`, roomID).Scan(&userIDsArray)
err = t.db.QueryRow(
`SELECT stream_id, user_ids FROM syncv3_typing WHERE room_id=$1 AND stream_id > $2`, roomID, streamID,
).Scan(&newStreamID, &userIDsArray)
if err == sql.ErrNoRows {
err = nil
}
return userIDsArray, err
return userIDsArray, newStreamID, err
}

View File

@ -18,19 +18,30 @@ func TestTypingTable(t *testing.T) {
}
roomID := "!foo:localhost"
table := NewTypingTable(db)
lastStreamID := -1
setAndCheck := func() {
err = table.SetTyping(roomID, userIDs)
streamID, err := table.SetTyping(roomID, userIDs)
if err != nil {
t.Fatalf("failed to SetTyping: %s", err)
}
gotUserIDs, err := table.Typing(roomID)
if streamID == 0 {
t.Errorf("SetTyping: streamID was not returned")
}
if lastStreamID >= streamID {
t.Errorf("SetTyping: streamID returned should always be increasing but it wasn't, got %d, last %d", streamID, lastStreamID)
}
lastStreamID = streamID
gotUserIDs, newStreamID, err := table.Typing(roomID, streamID-1)
if err != nil {
t.Fatalf("failed to Typing: %s", err)
}
if !reflect.DeepEqual(gotUserIDs, userIDs) {
t.Errorf("got typing users %v want %v", gotUserIDs, userIDs)
}
if newStreamID != streamID {
t.Errorf("Typing did not return the same stream ID as SetTyping, got %d want %d", newStreamID, streamID)
}
}
setAndCheck()
userIDs = userIDs[1:]

View File

@ -117,7 +117,7 @@ func (p *Poller) accumulate(res *SyncResponse) {
userIDs = append(userIDs, u.Str)
}
}
err = p.accumulator.SetTyping(roomID, userIDs)
_, err = p.accumulator.SetTyping(roomID, userIDs)
if err != nil {
p.logger.Err(err).Str("room_id", roomID).Strs("user_ids", userIDs).Msg("Accumulator: failed to set typing")
}
@ -141,5 +141,5 @@ type clientInterface interface {
type accumulatorInterface interface {
Accumulate(roomID string, timeline []json.RawMessage) error
Initialise(roomID string, state []json.RawMessage) error
SetTyping(roomID string, userIDs []string) error
SetTyping(roomID string, userIDs []string) (int, error)
}