Use gmsl.Timestamp in more places

This commit is contained in:
Kegan Dougal 2021-10-26 10:01:45 +01:00
parent 1510dce73a
commit eaea3402a2
9 changed files with 80 additions and 57 deletions

View File

@ -6,7 +6,6 @@ import (
"testing"
"time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/sync-v3/sync2"
"github.com/matrix-org/sync-v3/sync3"
"github.com/matrix-org/sync-v3/testutils"
@ -14,7 +13,7 @@ import (
func TestInteg(t *testing.T) {
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2)
v3 := runTestServer(t, v2, "")
defer v2.close()
defer v3.close()
alice := "@alice:localhost"
@ -30,7 +29,7 @@ func TestInteg(t *testing.T) {
testutils.NewStateEvent(t, "m.room.create", "", alice, map[string]interface{}{"creator": alice}),
testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "join"}),
testutils.NewStateEvent(t, "m.room.join_rules", "", alice, map[string]interface{}{"join_rule": "public"}),
testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "hello world"}, int64(gomatrixserverlib.AsTimestamp(time.Now()))),
testutils.NewEvent(t, "m.room.message", alice, map[string]interface{}{"body": "hello world"}, time.Now()),
},
},
},

View File

@ -5,7 +5,9 @@ import (
"encoding/json"
"reflect"
"testing"
"time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/sync-v3/testutils"
)
@ -222,6 +224,7 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
t.Fatalf("LatestEventNID: %s", err)
}
baseTimestamp := gomatrixserverlib.Timestamp(1632131678061).Time()
// Test the examples
// Stream Positions
// 1 2 3 4 5 6 7 8 9 10
@ -236,30 +239,30 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
RoomID: roomA,
Events: []json.RawMessage{
testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "join"}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, baseTimestamp),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, baseTimestamp),
},
},
{
RoomID: roomB,
Events: []json.RawMessage{
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, baseTimestamp),
testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "join"}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, baseTimestamp),
},
},
{
RoomID: roomA,
Events: []json.RawMessage{
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, baseTimestamp),
},
},
{
RoomID: roomC,
Events: []json.RawMessage{
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, baseTimestamp),
testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "leave"}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, baseTimestamp),
},
},
}
@ -335,30 +338,30 @@ func TestVisibleEventNIDsBetween(t *testing.T) {
{
RoomID: roomE,
Events: []json.RawMessage{
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, baseTimestamp),
testutils.NewStateEvent(t, "m.room.member", alice, bob, map[string]interface{}{"membership": "invite"}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, baseTimestamp.Add(1*time.Second)),
},
},
{
RoomID: roomD,
Events: []json.RawMessage{
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, baseTimestamp),
testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "leave"}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, baseTimestamp),
testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "join"}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, baseTimestamp.Add(1*time.Second)),
testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "leave"}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, baseTimestamp.Add(1*time.Second)),
},
},
{
RoomID: roomE,
Events: []json.RawMessage{
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, baseTimestamp),
testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{"membership": "join"}),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, 1632131678061),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, baseTimestamp),
testutils.NewEvent(t, "m.room.message", bob, map[string]interface{}{}, baseTimestamp),
},
},
}

View File

@ -194,7 +194,7 @@ func (s *ConnState) onIncomingRequest(ctx context.Context, req *Request) (*Respo
// do an UPDATE if the most recent room gets a 2nd event.
var targetRoom SortableRoom
fromIndex, ok := s.sortedJoinedRoomsPositions[updateEvent.roomID]
var lastTimestamp int64
var lastTimestamp uint64
if !ok {
// the user may have just joined the room hence not have an entry in this list yet.
fromIndex = len(s.sortedJoinedRooms)
@ -220,7 +220,7 @@ func (s *ConnState) onIncomingRequest(ctx context.Context, req *Request) (*Respo
}
toIndex := s.sortedJoinedRoomsPositions[updateEvent.roomID]
logger.Info().Int("from", fromIndex).Int("to", toIndex).
Int64("prev_ts", lastTimestamp).Int64("event_ts", updateEvent.timestamp).
Uint64("prev_ts", lastTimestamp).Uint64("event_ts", updateEvent.timestamp).
Interface("room", targetRoom.RoomID).Msg("moved!")
// the toIndex may not be inside a tracked range. If it isn't, we actually need to notify about a
// different room

View File

@ -9,14 +9,15 @@ import (
"testing"
"time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/sync-v3/testutils"
)
func newSortableRoom(roomID string, lastMsgTimestamp int64) SortableRoom {
func newSortableRoom(roomID string, lastMsgTimestamp gomatrixserverlib.Timestamp) SortableRoom {
return SortableRoom{
RoomID: roomID,
Name: "Room " + roomID,
LastMessageTimestamp: lastMsgTimestamp,
LastMessageTimestamp: uint64(lastMsgTimestamp),
LastEventJSON: json.RawMessage(
fmt.Sprintf(`{"type":"m.room.message","content":{"body":"hello"},"origin_server_ts":%d}`, lastMsgTimestamp),
),
@ -43,11 +44,11 @@ func TestConnStateInitial(t *testing.T) {
DeviceID: "d",
}
userID := "@TestConnStateInitial_alice:localhost"
timestampNow := int64(1632131678061)
timestampNow := gomatrixserverlib.Timestamp(1632131678061).Time()
// initial sort order B, C, A
roomA := newSortableRoom("!a:localhost", timestampNow-8000)
roomB := newSortableRoom("!b:localhost", timestampNow)
roomC := newSortableRoom("!c:localhost", timestampNow-4000)
roomA := newSortableRoom("!a:localhost", gomatrixserverlib.AsTimestamp(timestampNow.Add(-8*time.Second)))
roomB := newSortableRoom("!b:localhost", gomatrixserverlib.AsTimestamp(timestampNow))
roomC := newSortableRoom("!c:localhost", gomatrixserverlib.AsTimestamp(timestampNow.Add(-4*time.Second)))
globalCache := NewGlobalCache(nil)
globalCache.AssignRoom(roomA)
globalCache.AssignRoom(roomB)
@ -114,7 +115,7 @@ func TestConnStateInitial(t *testing.T) {
// bump A to the top
globalCache.OnNewEvents(roomA.RoomID, []json.RawMessage{
testutils.NewEvent(t, "unimportant", "me", struct{}{}, timestampNow+1000),
testutils.NewEvent(t, "unimportant", "me", struct{}{}, gomatrixserverlib.AsTimestamp(timestampNow.Add(1*time.Second)).Time()),
}, 1)
// request again for the diff
@ -146,7 +147,7 @@ func TestConnStateInitial(t *testing.T) {
// another message should just update
globalCache.OnNewEvents(roomA.RoomID, []json.RawMessage{
testutils.NewEvent(t, "unimportant", "me", struct{}{}, timestampNow+2000),
testutils.NewEvent(t, "unimportant", "me", struct{}{}, gomatrixserverlib.AsTimestamp(timestampNow.Add(2*time.Second)).Time()),
}, 1)
res, err = cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
@ -178,18 +179,18 @@ func TestConnStateMultipleRanges(t *testing.T) {
DeviceID: "d",
}
userID := "@TestConnStateMultipleRanges_alice:localhost"
timestampNow := int64(1632131678061)
timestampNow := gomatrixserverlib.Timestamp(1632131678061)
var rooms []SortableRoom
var roomIDs []string
globalCache := NewGlobalCache(nil)
roomIDToRoom := make(map[string]SortableRoom)
for i := 0; i < 10; i++ {
for i := int64(0); i < 10; i++ {
roomID := fmt.Sprintf("!%d:localhost", i)
room := SortableRoom{
RoomID: roomID,
Name: fmt.Sprintf("Room %d", i),
// room 1 is most recent, 10 is least recent
LastMessageTimestamp: timestampNow - int64(i*1000),
LastMessageTimestamp: uint64(uint64(timestampNow) - uint64(i*1000)),
LastEventJSON: []byte(`{}`),
}
rooms = append(rooms, room)
@ -273,7 +274,7 @@ func TestConnStateMultipleRanges(t *testing.T) {
// 8,0,1,2,3,4,5,6,7,9
//
globalCache.OnNewEvents(roomIDs[8], []json.RawMessage{
testutils.NewEvent(t, "unimportant", "me", struct{}{}, timestampNow+2000),
testutils.NewEvent(t, "unimportant", "me", struct{}{}, timestampNow.Time().Add(2*time.Second)),
}, 1)
res, err = cs.HandleIncomingRequest(context.Background(), connID, &Request{
@ -310,7 +311,7 @@ func TestConnStateMultipleRanges(t *testing.T) {
// 8,0,1,9,2,3,4,5,6,7 room
middleTimestamp := int64((roomIDToRoom[roomIDs[1]].LastMessageTimestamp + roomIDToRoom[roomIDs[2]].LastMessageTimestamp) / 2)
globalCache.OnNewEvents(roomIDs[9], []json.RawMessage{
testutils.NewEvent(t, "unimportant", "me", struct{}{}, middleTimestamp),
testutils.NewEvent(t, "unimportant", "me", struct{}{}, gomatrixserverlib.Timestamp(middleTimestamp).Time()),
}, 1)
res, err = cs.HandleIncomingRequest(context.Background(), connID, &Request{
Sort: []string{SortByRecency},
@ -346,7 +347,7 @@ func TestBumpToOutsideRange(t *testing.T) {
DeviceID: "d",
}
userID := "@TestBumpToOutsideRange_alice:localhost"
timestampNow := int64(1632131678061)
timestampNow := gomatrixserverlib.Timestamp(1632131678061)
roomA := newSortableRoom("!a:localhost", timestampNow)
roomB := newSortableRoom("!b:localhost", timestampNow-1000)
roomC := newSortableRoom("!c:localhost", timestampNow-2000)
@ -398,7 +399,7 @@ func TestBumpToOutsideRange(t *testing.T) {
// D gets bumped to C's position but it's still outside the range so nothing should happen
globalCache.OnNewEvents(roomD.RoomID, []json.RawMessage{
testutils.NewEvent(t, "unimportant", "me", struct{}{}, roomC.LastMessageTimestamp+2),
testutils.NewEvent(t, "unimportant", "me", struct{}{}, gomatrixserverlib.Timestamp(roomC.LastMessageTimestamp+2).Time()),
}, 1)
// expire the context after 10ms so we don't wait forevar
@ -425,11 +426,11 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
DeviceID: "d",
}
userID := "@TestConnStateRoomSubscriptions_alice:localhost"
timestampNow := int64(1632131678061)
timestampNow := gomatrixserverlib.Timestamp(1632131678061)
roomA := newSortableRoom("!a:localhost", timestampNow)
roomB := newSortableRoom("!b:localhost", timestampNow-1000)
roomC := newSortableRoom("!c:localhost", timestampNow-2000)
roomD := newSortableRoom("!d:localhost", timestampNow-3000)
roomB := newSortableRoom("!b:localhost", gomatrixserverlib.Timestamp(timestampNow-1000))
roomC := newSortableRoom("!c:localhost", gomatrixserverlib.Timestamp(timestampNow-2000))
roomD := newSortableRoom("!d:localhost", gomatrixserverlib.Timestamp(timestampNow-3000))
roomIDs := []string{roomA.RoomID, roomB.RoomID, roomC.RoomID, roomD.RoomID}
globalCache := NewGlobalCache(nil)
globalCache.AssignRoom(roomA)
@ -508,7 +509,7 @@ func TestConnStateRoomSubscriptions(t *testing.T) {
},
})
// room D gets a new event
newEvent := testutils.NewEvent(t, "unimportant", "me", struct{}{}, timestampNow+2000)
newEvent := testutils.NewEvent(t, "unimportant", "me", struct{}{}, gomatrixserverlib.Timestamp(timestampNow+2000).Time())
globalCache.OnNewEvents(roomD.RoomID, []json.RawMessage{
newEvent,
}, 1)

View File

@ -4,8 +4,8 @@ import (
"encoding/json"
"fmt"
"sync"
"time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/sync-v3/state"
"github.com/tidwall/gjson"
)
@ -178,7 +178,7 @@ func (c *GlobalCache) onNewEvent(
} else if eventType == "m.room.canonical_alias" && stateKey != nil && *stateKey == "" && globalRoom.Name == "" {
globalRoom.Name = ev.Get("content.alias").Str
}
eventTimestamp := ev.Get("origin_server_ts").Int()
eventTimestamp := ev.Get("origin_server_ts").Uint()
globalRoom.LastMessageTimestamp = eventTimestamp
globalRoom.LastEventJSON = event
c.globalRoomInfo[globalRoom.RoomID] = globalRoom
@ -242,7 +242,7 @@ func PopulateGlobalCache(store *state.Storage, cache *GlobalCache) error {
RoomID: ev.RoomID,
}
room.LastEventJSON = ev.JSON
room.LastMessageTimestamp = gjson.ParseBytes(ev.JSON).Get("origin_server_ts").Int()
room.LastMessageTimestamp = gjson.ParseBytes(ev.JSON).Get("origin_server_ts").Uint()
cache.AssignRoom(*room)
}
// load state events we care about for sync v3
@ -265,7 +265,7 @@ func PopulateGlobalCache(store *state.Storage, cache *GlobalCache) error {
}
}
cache.AssignRoom(*room)
fmt.Printf("Room: %s - %s - %s \n", room.RoomID, room.Name, time.Unix(room.LastMessageTimestamp/1000, 0))
fmt.Printf("Room: %s - %s - %s \n", room.RoomID, room.Name, gomatrixserverlib.Timestamp(room.LastMessageTimestamp).Time())
}
// populate joined rooms tracker

View File

@ -31,7 +31,7 @@ type EventData struct {
eventType string
stateKey *string
content gjson.Result
timestamp int64
timestamp uint64
// TODO: remove or factor out
userRoomData *UserRoomData

View File

@ -18,7 +18,7 @@ type Room struct {
type SortableRoom struct {
RoomID string
Name string // by_name
LastMessageTimestamp int64 // by_recency
LastMessageTimestamp uint64 // by_recency
LastEventJSON json.RawMessage
}

View File

@ -5,6 +5,9 @@ import (
"fmt"
"sync"
"testing"
"time"
"github.com/matrix-org/gomatrixserverlib"
)
var (
@ -21,14 +24,15 @@ func generateEventID(t *testing.T) string {
return fmt.Sprintf("$event_%d_%s", eventIDCounter, t.Name())
}
func NewStateEvent(t *testing.T, evType, stateKey, sender string, content interface{}) json.RawMessage {
func NewStateEvent(t *testing.T, evType, stateKey, sender string, content interface{}, ts ...time.Time) json.RawMessage {
t.Helper()
e := struct {
Type string `json:"type"`
StateKey string `json:"state_key"`
Sender string `json:"sender"`
Content interface{} `json:"content"`
EventID string `json:"event_id"`
Type string `json:"type"`
StateKey string `json:"state_key"`
Sender string `json:"sender"`
Content interface{} `json:"content"`
EventID string `json:"event_id"`
OriginServerTS int64 `json:"origin_server_ts"`
}{
Type: evType,
StateKey: stateKey,
@ -36,6 +40,11 @@ func NewStateEvent(t *testing.T, evType, stateKey, sender string, content interf
Content: content,
EventID: generateEventID(t),
}
if len(ts) == 0 {
e.OriginServerTS = int64(gomatrixserverlib.AsTimestamp(time.Now()))
} else {
e.OriginServerTS = int64(gomatrixserverlib.AsTimestamp(ts[0]))
}
j, err := json.Marshal(&e)
if err != nil {
t.Fatalf("failed to make event JSON: %s", err)
@ -43,7 +52,7 @@ func NewStateEvent(t *testing.T, evType, stateKey, sender string, content interf
return j
}
func NewEvent(t *testing.T, evType, sender string, content interface{}, originServerTs int64) json.RawMessage {
func NewEvent(t *testing.T, evType, sender string, content interface{}, originServerTs time.Time) json.RawMessage {
t.Helper()
e := struct {
Type string `json:"type"`
@ -56,7 +65,7 @@ func NewEvent(t *testing.T, evType, sender string, content interface{}, originSe
Sender: sender,
Content: content,
EventID: generateEventID(t),
TS: originServerTs,
TS: int64(gomatrixserverlib.AsTimestamp(originServerTs)),
}
j, err := json.Marshal(&e)
if err != nil {

View File

@ -126,6 +126,15 @@ func (s *testV3Server) close() {
s.srv.Close()
}
func (s *testV3Server) restart(t *testing.T, v2 *testV2Server, pq string) {
t.Helper()
log.Printf("restarting server")
s.close()
ss := runTestServer(t, v2, pq)
s.srv = ss.srv
v2.srv.CloseClientConnections() // kick-over v2 conns
}
func (s *testV3Server) mustDoV3Request(t *testing.T, token string, reqBody interface{}) (respBody *sync3.Response) {
t.Helper()
resp, code := s.doV3Request(t, token, reqBody)
@ -173,9 +182,11 @@ func (s *testV3Server) doV3Request(t *testing.T, token string, reqBody interface
return &r, resp.StatusCode
}
func runTestServer(t *testing.T, v2Server *testV2Server) *testV3Server {
func runTestServer(t *testing.T, v2Server *testV2Server, postgresConnectionString string) *testV3Server {
t.Helper()
postgresConnectionString := testutils.PrepareDBConnectionString("syncv3_test_sync3_integration")
if postgresConnectionString == "" {
postgresConnectionString = testutils.PrepareDBConnectionString("syncv3_test_sync3_integration")
}
h, err := sync3.NewSync3Handler(&sync2.HTTPClient{
Client: &http.Client{
Timeout: 5 * time.Minute,