Merge pull request #337 from matrix-org/kegan/data-race-poller

Fix race condition in test
This commit is contained in:
kegsay 2023-10-11 13:16:19 +01:00 committed by GitHub
commit fe624d5ad0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 67 additions and 21 deletions

View File

@ -10,12 +10,31 @@ import (
"github.com/matrix-org/sliding-sync/pubsub"
)
type syncSlice[T any] struct {
slice []T
mu sync.Mutex
}
func (s *syncSlice[T]) append(item T) {
s.mu.Lock()
defer s.mu.Unlock()
s.slice = append(s.slice, item)
}
func (s *syncSlice[T]) clone() []T {
s.mu.Lock()
defer s.mu.Unlock()
result := make([]T, len(s.slice))
copy(result, s.slice)
return result
}
func TestDeviceTickerBasic(t *testing.T) {
duration := time.Millisecond
ticker := NewDeviceDataTicker(duration)
var payloads []*pubsub.V2DeviceData
var payloads syncSlice[*pubsub.V2DeviceData]
ticker.SetCallback(func(payload *pubsub.V2DeviceData) {
payloads = append(payloads, payload)
payloads.append(payload)
})
var wg sync.WaitGroup
wg.Add(1)
@ -31,29 +50,31 @@ func TestDeviceTickerBasic(t *testing.T) {
DeviceID: "b",
})
time.Sleep(duration * 2)
if len(payloads) != 1 {
t.Fatalf("expected 1 callback, got %d", len(payloads))
result := payloads.clone()
if len(result) != 1 {
t.Fatalf("expected 1 callback, got %d", len(result))
}
want := map[string][]string{
"a": {"b"},
}
assertPayloadEqual(t, payloads[0].UserIDToDeviceIDs, want)
assertPayloadEqual(t, result[0].UserIDToDeviceIDs, want)
// check stopping works
payloads = []*pubsub.V2DeviceData{}
payloads = syncSlice[*pubsub.V2DeviceData]{}
ticker.Stop()
wg.Wait()
time.Sleep(duration * 2)
if len(payloads) != 0 {
t.Fatalf("got extra payloads: %+v", payloads)
result = payloads.clone()
if len(result) != 0 {
t.Fatalf("got extra payloads: %+v", result)
}
}
func TestDeviceTickerBatchesCorrectly(t *testing.T) {
duration := 100 * time.Millisecond
ticker := NewDeviceDataTicker(duration)
var payloads []*pubsub.V2DeviceData
var payloads syncSlice[*pubsub.V2DeviceData]
ticker.SetCallback(func(payload *pubsub.V2DeviceData) {
payloads = append(payloads, payload)
payloads.append(payload)
})
go ticker.Run()
defer ticker.Stop()
@ -74,23 +95,23 @@ func TestDeviceTickerBatchesCorrectly(t *testing.T) {
DeviceID: "y", // new device and user
})
time.Sleep(duration * 2)
if len(payloads) != 1 {
t.Fatalf("expected 1 callback, got %d", len(payloads))
result := payloads.clone()
if len(result) != 1 {
t.Fatalf("expected 1 callback, got %d", len(result))
}
want := map[string][]string{
"a": {"b", "bb"},
"x": {"y"},
}
assertPayloadEqual(t, payloads[0].UserIDToDeviceIDs, want)
assertPayloadEqual(t, result[0].UserIDToDeviceIDs, want)
}
func TestDeviceTickerForgetsAfterEmitting(t *testing.T) {
duration := time.Millisecond
ticker := NewDeviceDataTicker(duration)
var payloads []*pubsub.V2DeviceData
var payloads syncSlice[*pubsub.V2DeviceData]
ticker.SetCallback(func(payload *pubsub.V2DeviceData) {
payloads = append(payloads, payload)
payloads.append(payload)
})
ticker.Remember(PollerID{
UserID: "a",
@ -104,8 +125,9 @@ func TestDeviceTickerForgetsAfterEmitting(t *testing.T) {
DeviceID: "b",
})
time.Sleep(10 * duration)
if len(payloads) != 1 {
t.Fatalf("got %d payloads, want 1", len(payloads))
result := payloads.clone()
if len(result) != 1 {
t.Fatalf("got %d payloads, want 1", len(result))
}
}

View File

@ -18,6 +18,31 @@ import (
const initialSinceToken = "0"
// monkey patch out time.Since with a test controlled value.
// This is done in the init block so we can make sure we swap it out BEFORE any pollers
// start. If we wait until pollers exist, we get data races. This includes pollers in tests
// which don't use timeSince, hence the init block.
var (
timeSinceMu sync.Mutex
timeSinceValue = time.Duration(0) // 0 means use the real impl
)
func setTimeSinceValue(val time.Duration) {
timeSinceMu.Lock()
timeSinceValue = time.Minute * 2
timeSinceMu.Unlock()
}
func init() {
timeSince = func(t time.Time) time.Duration {
timeSinceMu.Lock()
defer timeSinceMu.Unlock()
if timeSinceValue == 0 {
return time.Since(t)
}
return timeSinceValue
}
}
// Tests that EnsurePolling works in the happy case
func TestPollerMapEnsurePolling(t *testing.T) {
nextSince := "next"
@ -528,9 +553,8 @@ func TestPollerPollUpdateDeviceSincePeriodically(t *testing.T) {
wantSinceFromSync = next
// 4. ... some time has passed, this triggers the 1min limit
timeSince = func(d time.Time) time.Duration {
return time.Minute * 2
}
setTimeSinceValue(time.Minute * 2)
defer setTimeSinceValue(0) // reset
next = "10"
syncResponses <- &SyncResponse{NextBatch: next}
mustEqualSince(t, <-syncCalledWithSince, wantSinceFromSync)