Merge pull request #238 from matrix-org/dmr/ignored-users-1

This commit is contained in:
David Robertson 2023-08-03 16:46:11 +01:00 committed by GitHub
commit 34881e328f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 251 additions and 17 deletions

View File

@ -191,18 +191,22 @@ type UserCache struct {
store *state.Storage
globalCache *GlobalCache
txnIDs TransactionIDFetcher
ignoredUsers map[string]struct{}
ignoredUsersMu *sync.RWMutex
}
func NewUserCache(userID string, globalCache *GlobalCache, store *state.Storage, txnIDs TransactionIDFetcher) *UserCache {
uc := &UserCache{
UserID: userID,
roomToDataMu: &sync.RWMutex{},
roomToData: make(map[string]UserRoomData),
listeners: make(map[int]UserCacheListener),
listenersMu: &sync.RWMutex{},
store: store,
globalCache: globalCache,
txnIDs: txnIDs,
UserID: userID,
roomToDataMu: &sync.RWMutex{},
roomToData: make(map[string]UserRoomData),
listeners: make(map[int]UserCacheListener),
listenersMu: &sync.RWMutex{},
store: store,
globalCache: globalCache,
txnIDs: txnIDs,
ignoredUsers: make(map[string]struct{}),
ignoredUsersMu: &sync.RWMutex{},
}
return uc
}
@ -651,7 +655,8 @@ func (c *UserCache) OnAccountData(ctx context.Context, datas []state.AccountData
up := roomUpdates[d.RoomID]
up = append(up, d)
roomUpdates[d.RoomID] = up
if d.Type == "m.direct" {
switch d.Type {
case "m.direct":
dmRoomSet := make(map[string]struct{})
// pull out rooms and mark them as DMs
content := gjson.ParseBytes(d.Data).Get("content")
@ -676,7 +681,7 @@ func (c *UserCache) OnAccountData(ctx context.Context, datas []state.AccountData
c.roomToData[dmRoomID] = u
}
c.roomToDataMu.Unlock()
} else if d.Type == "m.tag" {
case "m.tag":
content := gjson.ParseBytes(d.Data).Get("content.tags")
if tagUpdates[d.RoomID] == nil {
tagUpdates[d.RoomID] = make(map[string]float64)
@ -685,6 +690,22 @@ func (c *UserCache) OnAccountData(ctx context.Context, datas []state.AccountData
tagUpdates[d.RoomID][k.Str] = v.Get("order").Float()
return true
})
case "m.ignored_user_list":
if d.RoomID != state.AccountDataGlobalRoom {
continue
}
content := gjson.ParseBytes(d.Data).Get("content.ignored_users")
if !content.IsObject() {
continue
}
ignoredUsers := make(map[string]struct{})
content.ForEach(func(k, v gjson.Result) bool {
ignoredUsers[k.Str] = struct{}{}
return true
})
c.ignoredUsersMu.Lock()
c.ignoredUsers = ignoredUsers
c.ignoredUsersMu.Unlock()
}
}
if len(tagUpdates) > 0 {
@ -717,3 +738,10 @@ func (c *UserCache) OnAccountData(ctx context.Context, datas []state.AccountData
}
}
func (u *UserCache) ShouldIgnore(userID string) bool {
u.ignoredUsersMu.RLock()
defer u.ignoredUsersMu.RUnlock()
_, ignored := u.ignoredUsers[userID]
return ignored
}

View File

@ -130,13 +130,24 @@ func (s *connStateLive) processLiveUpdate(ctx context.Context, up caches.Update,
internal.AssertWithContext(ctx, "processLiveUpdate: request list length != internal list length", s.lists.Len() == len(s.muxedReq.Lists))
roomUpdate, _ := up.(caches.RoomUpdate)
roomEventUpdate, _ := up.(*caches.RoomEventUpdate)
// if this is a room event update we may not want to process this if the event nid is < loadPos,
// as that means we have already taken it into account
if roomEventUpdate != nil && !roomEventUpdate.EventData.AlwaysProcess {
// check if we should skip this update. Do we know of this room (lp > 0) and if so, is this event
// behind what we've processed before?
lp := s.loadPositions[roomEventUpdate.RoomID()]
if lp > 0 && roomEventUpdate.EventData.NID < lp {
if roomEventUpdate != nil {
// if this is a room event update we may not want to process this event, for a few reasons.
if !roomEventUpdate.EventData.AlwaysProcess {
// check if we should skip this update. Do we know of this room (lp > 0) and if so, is this event
// behind what we've processed before?
lp := s.loadPositions[roomEventUpdate.RoomID()]
if lp > 0 && roomEventUpdate.EventData.NID < lp {
return false
}
}
// Skip message events from ignored users.
if roomEventUpdate.EventData.StateKey == nil && s.userCache.ShouldIgnore(roomEventUpdate.EventData.Sender) {
logger.Trace().
Str("user", s.userID).
Str("type", roomEventUpdate.EventData.EventType).
Str("sender", roomEventUpdate.EventData.Sender).
Msg("ignoring event update")
return false
}
}

View File

@ -507,6 +507,15 @@ func (h *SyncLiveHandler) userCache(userID string) (*caches.UserCache, error) {
uc.OnAccountData(context.Background(), []state.AccountData{directEvent[0]})
}
// select the ignored users account data event and set ignored user list
ignoreEvent, err := h.Storage.AccountData(userID, sync2.AccountDataGlobalRoom, []string{"m.ignored_user_list"})
if err != nil {
return nil, fmt.Errorf("failed to load ignored user list for user %s: %w", userID, err)
}
if len(ignoreEvent) == 1 {
uc.OnAccountData(context.Background(), []state.AccountData{ignoreEvent[0]})
}
// select all room tag account data and set it
tagEvents, err := h.Storage.RoomAccountDatasWithType(userID, "m.tag")
if err != nil {

View File

@ -0,0 +1,186 @@
package syncv3
import (
"encoding/json"
"github.com/matrix-org/sliding-sync/sync2"
"github.com/matrix-org/sliding-sync/sync3"
"github.com/matrix-org/sliding-sync/testutils"
"github.com/matrix-org/sliding-sync/testutils/m"
"testing"
"time"
)
// Test that messages from ignored users are not sent to clients, even if they appear
// on someone else's poller first.
func TestIgnoredUsersDuringLiveUpdate(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()
const nigel = "@nigel:localhost"
roomID := "!unimportant"
v2.addAccount(t, alice, aliceToken)
v2.addAccount(t, bob, bobToken)
// Bob creates a room. Nigel and Alice join.
state := createRoomState(t, bob, time.Now())
state = append(state, testutils.NewStateEvent(t, "m.room.member", nigel, nigel, map[string]interface{}{
"membership": "join",
}))
aliceJoin := testutils.NewStateEvent(t, "m.room.member", alice, alice, map[string]interface{}{
"membership": "join",
})
t.Log("Alice and Bob's pollers sees Alice's join.")
v2.queueResponse(alice, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: roomID,
state: state,
events: []json.RawMessage{aliceJoin},
}),
},
NextBatch: "alice_sync_1",
})
v2.queueResponse(bob, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: roomID,
state: state,
events: []json.RawMessage{aliceJoin},
}),
},
NextBatch: "bob_sync_1",
})
t.Log("Alice sliding syncs.")
aliceRes := v3.mustDoV3Request(t, aliceToken, sync3.Request{
Lists: map[string]sync3.RequestList{
"a": {
Ranges: sync3.SliceRanges{{0, 10}},
RoomSubscription: sync3.RoomSubscription{
TimelineLimit: 20,
},
},
},
})
t.Log("She should see her join.")
m.MatchResponse(t, aliceRes, m.MatchRoomSubscription(roomID, m.MatchRoomTimeline([]json.RawMessage{aliceJoin})))
t.Log("Bob sliding syncs.")
bobRes := v3.mustDoV3Request(t, bobToken, sync3.Request{
Lists: map[string]sync3.RequestList{
"a": {
Ranges: sync3.SliceRanges{{0, 10}},
RoomSubscription: sync3.RoomSubscription{
TimelineLimit: 20,
},
},
},
})
t.Log("He should see Alice's join.")
m.MatchResponse(t, bobRes, m.MatchRoomSubscription(roomID, m.MatchRoomTimeline([]json.RawMessage{aliceJoin})))
t.Log("Alice ignores Nigel.")
v2.queueResponse(alice, sync2.SyncResponse{
AccountData: sync2.EventsResponse{
Events: []json.RawMessage{
testutils.NewAccountData(t, "m.ignored_user_list", map[string]any{
"ignored_users": map[string]any{
nigel: map[string]any{},
},
}),
},
},
NextBatch: "alice_sync_2",
})
v2.waitUntilEmpty(t, alice)
t.Log("Bob's poller sees a message from Nigel, then a message from Alice.")
nigelMsg := testutils.NewMessageEvent(t, nigel, "naughty nigel")
aliceMsg := testutils.NewMessageEvent(t, alice, "angelic alice")
v2.queueResponse(bob, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: roomID,
events: []json.RawMessage{nigelMsg, aliceMsg},
}),
},
NextBatch: "bob_sync_2",
})
v2.waitUntilEmpty(t, bob)
t.Log("Bob syncs. He should see both messages.")
bobRes = v3.mustDoV3RequestWithPos(t, bobToken, bobRes.Pos, sync3.Request{})
m.MatchResponse(t, bobRes, m.MatchRoomSubscription(roomID, m.MatchRoomTimeline([]json.RawMessage{nigelMsg, aliceMsg})))
t.Log("Alice syncs. She should see her message, but not Nigel's.")
aliceRes = v3.mustDoV3RequestWithPos(t, aliceToken, aliceRes.Pos, sync3.Request{})
m.MatchResponse(t, aliceRes, m.MatchRoomSubscription(roomID, m.MatchRoomTimeline([]json.RawMessage{aliceMsg})))
t.Log("Bob's poller sees Nigel set a custom state event")
nigelState := testutils.NewStateEvent(t, "com.example.fruit", "banana", nigel, map[string]any{})
v2.queueResponse(bob, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: roomID,
events: []json.RawMessage{nigelState},
}),
},
NextBatch: "bob_sync_3",
})
v2.waitUntilEmpty(t, bob)
t.Log("Alice syncs. She should see Nigel's state event.")
aliceRes = v3.mustDoV3RequestWithPos(t, aliceToken, aliceRes.Pos, sync3.Request{})
m.MatchResponse(t, aliceRes, m.MatchRoomSubscription(roomID, m.MatchRoomTimeline([]json.RawMessage{nigelState})))
t.Log("Bob's poller sees Alice send a message.")
aliceMsg2 := testutils.NewMessageEvent(t, alice, "angelic alice 2")
v2.queueResponse(bob, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: roomID,
events: []json.RawMessage{aliceMsg2},
}),
},
NextBatch: "bob_sync_4",
})
v2.waitUntilEmpty(t, bob)
t.Log("Alice syncs, making a new conn with a direct room subscription.")
aliceRes = v3.mustDoV3Request(t, aliceToken, sync3.Request{
Lists: map[string]sync3.RequestList{},
RoomSubscriptions: map[string]sync3.RoomSubscription{
roomID: {
TimelineLimit: 20,
},
},
})
t.Log("Alice sees her message.")
m.MatchResponse(t, aliceRes, m.MatchRoomSubscription(roomID, m.MatchRoomTimelineMostRecent(1, []json.RawMessage{aliceMsg2})))
t.Log("Bob's poller sees Nigel and Alice send a message.")
nigelMsg2 := testutils.NewMessageEvent(t, nigel, "naughty nigel 3")
aliceMsg3 := testutils.NewMessageEvent(t, alice, "angelic alice 3")
v2.queueResponse(bob, sync2.SyncResponse{
Rooms: sync2.SyncRoomsResponse{
Join: v2JoinTimeline(roomEvents{
roomID: roomID,
events: []json.RawMessage{nigelMsg2, aliceMsg3},
}),
},
NextBatch: "bob_sync_5",
})
v2.waitUntilEmpty(t, bob)
t.Log("Alice syncs. She should only see her message.")
aliceRes = v3.mustDoV3RequestWithPos(t, aliceToken, aliceRes.Pos, sync3.Request{})
m.MatchResponse(t, aliceRes, m.MatchRoomSubscription(roomID, m.MatchRoomTimeline([]json.RawMessage{aliceMsg3})))
}