Batch together updates

Previously when live streaming you could only get 1 update per request.
We now batch them up until there are no more incoming events.
This commit is contained in:
Kegan Dougal 2021-12-15 09:51:25 +00:00
parent 502d3b5852
commit 0dff964705
5 changed files with 67 additions and 29 deletions

View File

@ -15,6 +15,9 @@ func DeviceIDFromRequest(req *http.Request) (string, error) {
return "", fmt.Errorf("missing Authorization header")
}
accessToken := strings.TrimPrefix(ah, "Bearer ")
// important that this is a cryptographically secure hash function to prevent
// preimage attacks where Eve can use a fake token to hash to an existing device ID
// on the server.
hash := sha256.New()
hash.Write([]byte(accessToken))
return hex.EncodeToString(hash.Sum(nil)), nil

24
internal/request_test.go Normal file
View File

@ -0,0 +1,24 @@
package internal
import (
"net/http"
"testing"
)
func TestDeviceIDFromRequest(t *testing.T) {
req, _ := http.NewRequest("POST", "http://localhost:8008", nil)
req.Header.Set("Authorization", "Bearer A")
deviceIDA, err := DeviceIDFromRequest(req)
if err != nil {
t.Fatalf("DeviceIDFromRequest returned %s", err)
}
req.Header.Set("Authorization", "Bearer B")
deviceIDB, err := DeviceIDFromRequest(req)
if err != nil {
t.Fatalf("DeviceIDFromRequest returned %s", err)
}
if deviceIDA == deviceIDB {
t.Fatalf("DeviceIDFromRequest: hashed to same device ID: %s", deviceIDA)
}
}

View File

@ -174,33 +174,10 @@ blockloop:
case <-time.After(time.Duration(req.TimeoutSecs()) * time.Second):
break blockloop
case connEvent := <-s.updateEvents: // TODO: keep reading until it is empty before responding.
if connEvent.roomMetadata != nil {
// always update our view of the world
s.lists.ForEach(func(index int, list *sync3.FilteredSortableRooms) {
// TODO: yuck that the index is here
deletedIndex := list.UpdateGlobalRoomMetadata(connEvent.roomMetadata)
if deletedIndex >= 0 && s.muxedReq.Lists[index].Rooms.Inside(int64(deletedIndex)) {
responseOperations = append(responseOperations, &sync3.ResponseOpSingle{
List: index,
Operation: sync3.OpDelete,
Index: &deletedIndex,
})
}
})
}
if connEvent.msg != nil {
subs, ops := s.processIncomingEvent(connEvent.msg)
responseOperations = append(responseOperations, ops...)
for _, sub := range subs {
response.RoomSubscriptions[sub.RoomID] = sub
}
}
if connEvent.userMsg.msg != nil {
subs, ops := s.processIncomingUserEvent(connEvent.roomID, connEvent.userMsg.msg, connEvent.userMsg.hasCountDecreased)
responseOperations = append(responseOperations, ops...)
for _, sub := range subs {
response.RoomSubscriptions[sub.RoomID] = sub
}
responseOperations = s.processLiveEvent(connEvent, responseOperations, response)
for len(s.updateEvents) > 0 {
connEvent = <-s.updateEvents
responseOperations = s.processLiveEvent(connEvent, responseOperations, response)
}
}
}
@ -211,6 +188,38 @@ blockloop:
return response, nil
}
func (s *ConnState) processLiveEvent(connEvent *ConnEvent, responseOperations []sync3.ResponseOp, response *sync3.Response) []sync3.ResponseOp {
if connEvent.roomMetadata != nil {
// always update our view of the world
s.lists.ForEach(func(index int, list *sync3.FilteredSortableRooms) {
// TODO: yuck that the index is here
deletedIndex := list.UpdateGlobalRoomMetadata(connEvent.roomMetadata)
if deletedIndex >= 0 && s.muxedReq.Lists[index].Rooms.Inside(int64(deletedIndex)) {
responseOperations = append(responseOperations, &sync3.ResponseOpSingle{
List: index,
Operation: sync3.OpDelete,
Index: &deletedIndex,
})
}
})
}
if connEvent.msg != nil {
subs, ops := s.processIncomingEvent(connEvent.msg)
responseOperations = append(responseOperations, ops...)
for _, sub := range subs {
response.RoomSubscriptions[sub.RoomID] = sub
}
}
if connEvent.userMsg.msg != nil {
subs, ops := s.processIncomingUserEvent(connEvent.roomID, connEvent.userMsg.msg, connEvent.userMsg.hasCountDecreased)
responseOperations = append(responseOperations, ops...)
for _, sub := range subs {
response.RoomSubscriptions[sub.RoomID] = sub
}
}
return responseOperations
}
func (s *ConnState) onIncomingListRequest(listIndex int, prevReqList, nextReqList *sync3.RequestList) []sync3.ResponseOp {
roomList := s.lists.List(listIndex)
// TODO: calculate the M values for N < M calcs

View File

@ -55,7 +55,9 @@ func NewSync3Handler(v2Client sync2.Client, postgresDBURI string) (*SyncLiveHand
userCaches: &sync.Map{},
Dispatcher: sync3.NewDispatcher(),
GlobalCache: sync3.NewGlobalCache(store),
Extensions: &extensions.Handler{store},
Extensions: &extensions.Handler{
Store: store,
},
}
sh.PollerMap = sync2.NewPollerMap(v2Client, sh)
roomToJoinedUsers, err := store.AllJoinedMembers()

View File

@ -269,7 +269,7 @@ func MatchRoomName(name string) roomMatcher {
func MatchRoomRequiredState(events []json.RawMessage) roomMatcher {
return func(r sync3.Room) error {
if len(r.RequiredState) != len(events) {
return fmt.Errorf("required state length mismatchm got %d want %d", len(r.RequiredState), len(events))
return fmt.Errorf("required state length mismatch, got %d want %d", len(r.RequiredState), len(events))
}
// allow any ordering for required state
for _, want := range events {