Integration test

This commit is contained in:
David Robertson 2023-08-10 13:11:58 +01:00
parent 93bac84007
commit 00a9e31ed1
No known key found for this signature in database
GPG Key ID: 903ECE108A39DEDD
2 changed files with 80 additions and 2 deletions

View File

@ -568,12 +568,16 @@ func (h *Handler) startPollerExpiryTicker() {
h.pollerExpiryTicker = time.NewTicker(time.Hour)
go func() {
for range h.pollerExpiryTicker.C {
h.expireOldPollers()
h.ExpireOldPollers()
}
}()
}
func (h *Handler) expireOldPollers() {
// ExpireOldPollers looks for pollers whose devices have not made a sliding sync query
// in the last 30 days, and asks the poller map to expire their corresponding pollers.
// This function does not normally need to be called manually (StartV2Pollers queues it
// up to run hourly); we expose it publicly only for testing purposes.
func (h *Handler) ExpireOldPollers() {
devices, err := h.v2Store.DevicesTable.FindOldDevices(30 * 24 * time.Hour)
if err != nil {
logger.Err(err).Msg("Error fetching old devices")

View File

@ -3,7 +3,10 @@ package syncv3
import (
"encoding/json"
"fmt"
"github.com/jmoiron/sqlx"
"github.com/matrix-org/sliding-sync/sqlutil"
"net/http"
"os"
"testing"
"time"
@ -311,3 +314,74 @@ func TestPollerUpdatesRoomMemberTrackerOnGappySyncStateBlock(t *testing.T) {
m.MatchRoomSubscription(roomID, m.MatchRoomTimelineMostRecent(1, []json.RawMessage{bobLeave})),
)
}
func TestPollersCanBeResumedAfterExpiry(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
// Start the mock sync v2 server and add a device for alice and for bob.
v2 := runTestV2Server(t)
defer v2.close()
const aliceDevice = "alice_phone"
const bobDevice = "bob_desktop"
v2.addAccountWithDeviceID(alice, aliceDevice, aliceToken)
v2.addAccountWithDeviceID(bob, bobDevice, bobToken)
// Queue up a sync v2 response for both Alice and Bob.
v2.queueResponse(aliceToken, sync2.SyncResponse{NextBatch: "alice_response_1"})
v2.queueResponse(bobToken, sync2.SyncResponse{NextBatch: "bob_response_1"})
// Inject an old token from Alice and a new token from Bob into the DB.
v2Store := sync2.NewStore(pqString, os.Getenv("SYNCV3_SECRET"))
err := sqlutil.WithTransaction(v2Store.DB, func(txn *sqlx.Tx) (err error) {
err = v2Store.DevicesTable.InsertDevice(txn, alice, aliceDevice)
if err != nil {
return
}
err = v2Store.DevicesTable.InsertDevice(txn, bob, bobDevice)
if err != nil {
return
}
_, err = v2Store.TokensTable.Insert(txn, aliceToken, alice, aliceDevice, time.UnixMicro(0))
if err != nil {
return
}
_, err = v2Store.TokensTable.Insert(txn, bobToken, bob, bobDevice, time.Now())
return
})
if err != nil {
t.Fatal(err)
}
t.Log("Start the v3 server and its pollers.")
v3 := runTestServer(t, v2, pqString)
go v3.h2.StartV2Pollers()
defer v3.close()
t.Log("Alice's poller should be active.")
v2.waitUntilEmpty(t, aliceToken)
t.Log("Bob's poller should be active.")
v2.waitUntilEmpty(t, bobToken)
t.Log("Manually trigger a poller cleanup.")
v3.h2.ExpireOldPollers()
t.Log("Queue up a sync v2 response for both Alice and Bob.")
v2.queueResponse(aliceToken, sync2.SyncResponse{NextBatch: "alice_response_2"})
v2.queueResponse(bobToken, sync2.SyncResponse{NextBatch: "bob_response_2"})
t.Log("Wait for Bob's poller to poll")
v2.waitUntilEmpty(t, bobToken)
// Alice's poller has likely already made an HTTP response. But her poller should
// have been terminated before the request was received, so its since token
// should not have been persisted to the DB.
t.Log("Alice's since token in the DB should not have advanced.")
// TODO: surprising that there isn't a function to get the since token for a device!
var since string
err = v2Store.DB.Get(&since, `SELECT since FROM syncv3_sync2_devices WHERE user_id = $1 AND device_id = $2`, alice, aliceDevice)
if err != nil {
t.Fatal(err)
}
if since != "alice_response_1" {
t.Errorf("Alice's sync token in DB was %s, expected alice_response_1", since)
}
}