Fix remaining race conditions; add -race to CI

This commit is contained in:
Kegan Dougal 2024-03-11 10:30:03 +00:00
parent c7dd361ca6
commit 4d54faa1a6
3 changed files with 12 additions and 6 deletions

View File

@ -59,7 +59,7 @@ jobs:
- name: Test - name: Test
run: | run: |
set -euo pipefail set -euo pipefail
go test -count=1 -covermode=atomic -coverpkg ./... -p 1 -v -json $(go list ./... | grep -v tests-e2e) -coverprofile synccoverage.out 2>&1 | tee ./test-integration.log | gotestfmt -hide all go test -count=1 -race -covermode=atomic -coverpkg ./... -p 1 -v -json $(go list ./... | grep -v tests-e2e) -coverprofile synccoverage.out 2>&1 | tee ./test-integration.log | gotestfmt -hide all
shell: bash shell: bash
env: env:
POSTGRES_HOST: localhost POSTGRES_HOST: localhost

View File

@ -84,7 +84,11 @@ func (ps *PubSub) Notify(chanName string, p Payload) error {
return fmt.Errorf("notify with payload %v timed out", p.Type()) return fmt.Errorf("notify with payload %v timed out", p.Type())
} }
if ps.bufferSize == 0 { if ps.bufferSize == 0 {
// for some reason go test -race flags this as racing with calls
// to close(ch), despite the fact that it _should_ be thread-safe :S
ps.mu.Lock()
ch <- &emptyPayload{} ch <- &emptyPayload{}
ps.mu.Unlock()
} }
return nil return nil
} }

View File

@ -4,12 +4,14 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/matrix-org/sliding-sync/testutils"
"reflect" "reflect"
"sort" "sort"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"github.com/matrix-org/sliding-sync/testutils"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/matrix-org/sliding-sync/sqlutil" "github.com/matrix-org/sliding-sync/sqlutil"
"github.com/matrix-org/sliding-sync/sync2" "github.com/matrix-org/sliding-sync/sync2"
@ -680,7 +682,7 @@ func TestAccumulatorConcurrency(t *testing.T) {
[]byte(`{"event_id":"con_4", "type":"m.room.name", "state_key":"", "content":{"name":"4"}}`), []byte(`{"event_id":"con_4", "type":"m.room.name", "state_key":"", "content":{"name":"4"}}`),
[]byte(`{"event_id":"con_5", "type":"m.room.name", "state_key":"", "content":{"name":"5"}}`), []byte(`{"event_id":"con_5", "type":"m.room.name", "state_key":"", "content":{"name":"5"}}`),
} }
totalNumNew := 0 var totalNumNew atomic.Int64
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(newEvents)) wg.Add(len(newEvents))
for i := 0; i < len(newEvents); i++ { for i := 0; i < len(newEvents); i++ {
@ -689,7 +691,7 @@ func TestAccumulatorConcurrency(t *testing.T) {
subset := newEvents[:(i + 1)] // i=0 => [1], i=1 => [1,2], etc subset := newEvents[:(i + 1)] // i=0 => [1], i=1 => [1,2], etc
err := sqlutil.WithTransaction(accumulator.db, func(txn *sqlx.Tx) error { err := sqlutil.WithTransaction(accumulator.db, func(txn *sqlx.Tx) error {
result, err := accumulator.Accumulate(txn, userID, roomID, sync2.TimelineResponse{Events: subset}) result, err := accumulator.Accumulate(txn, userID, roomID, sync2.TimelineResponse{Events: subset})
totalNumNew += result.NumNew totalNumNew.Add(int64(result.NumNew))
return err return err
}) })
if err != nil { if err != nil {
@ -698,8 +700,8 @@ func TestAccumulatorConcurrency(t *testing.T) {
}(i) }(i)
} }
wg.Wait() // wait for all goroutines to finish wg.Wait() // wait for all goroutines to finish
if totalNumNew != len(newEvents) { if int(totalNumNew.Load()) != len(newEvents) {
t.Errorf("got %d total new events, want %d", totalNumNew, len(newEvents)) t.Errorf("got %d total new events, want %d", totalNumNew.Load(), len(newEvents))
} }
// check that the name of the room is "5" // check that the name of the room is "5"
snapshot := currentSnapshotNIDs(t, accumulator.snapshotTable, roomID) snapshot := currentSnapshotNIDs(t, accumulator.snapshotTable, roomID)