David Robertson f595aed2c5
Add a separate payload for redacting state
So that we don't end up nuking conns unnecessarily.
2023-11-01 19:03:17 +00:00

207 lines
4.7 KiB
Go

package pubsub
import (
"encoding/json"
"github.com/matrix-org/sliding-sync/internal"
)
// The channel which has V2* payloads
const ChanV2 = "v2ch"
// V2Listener describes the messages that sync v2 pollers will publish.
type V2Listener interface {
Initialise(p *V2Initialise)
Accumulate(p *V2Accumulate)
OnTransactionID(p *V2TransactionID)
OnAccountData(p *V2AccountData)
OnInvite(p *V2InviteRoom)
OnLeftRoom(p *V2LeaveRoom)
OnUnreadCounts(p *V2UnreadCounts)
OnInitialSyncComplete(p *V2InitialSyncComplete)
OnDeviceData(p *V2DeviceData)
OnTyping(p *V2Typing)
OnReceipt(p *V2Receipt)
OnDeviceMessages(p *V2DeviceMessages)
OnExpiredToken(p *V2ExpiredToken)
OnInvalidateRoom(p *V2InvalidateRoom)
OnStateRedaction(p *V2StateRedaction)
}
type V2Initialise struct {
RoomID string
SnapshotNID int64
}
func (*V2Initialise) Type() string { return "V2Initialise" }
type V2Accumulate struct {
RoomID string
PrevBatch string
EventNIDs []int64
}
func (*V2Accumulate) Type() string { return "V2Accumulate" }
// V2TransactionID is emitted by a poller when it sees an event with a transaction ID,
// or when it is certain that no other poller will see a transaction ID for this event
// (the "all-clear").
type V2TransactionID struct {
EventID string
RoomID string
UserID string // of the sender
DeviceID string
TransactionID string // Note: an empty transaction ID represents the all-clear.
NID int64
}
func (*V2TransactionID) Type() string { return "V2TransactionID" }
type V2UnreadCounts struct {
UserID string
RoomID string
HighlightCount *int
NotificationCount *int
}
func (*V2UnreadCounts) Type() string { return "V2UnreadCounts" }
type V2AccountData struct {
UserID string
RoomID string
Types []string
}
func (*V2AccountData) Type() string { return "V2AccountData" }
type V2LeaveRoom struct {
UserID string
RoomID string
LeaveEvent json.RawMessage
}
func (*V2LeaveRoom) Type() string { return "V2LeaveRoom" }
type V2InviteRoom struct {
UserID string
RoomID string
}
func (*V2InviteRoom) Type() string { return "V2InviteRoom" }
type V2InitialSyncComplete struct {
UserID string
DeviceID string
Success bool
}
func (*V2InitialSyncComplete) Type() string { return "V2InitialSyncComplete" }
type V2DeviceData struct {
UserIDToDeviceIDs map[string][]string
}
func (*V2DeviceData) Type() string { return "V2DeviceData" }
type V2Typing struct {
RoomID string
EphemeralEvent json.RawMessage
}
func (*V2Typing) Type() string { return "V2Typing" }
type V2Receipt struct {
RoomID string
Receipts []internal.Receipt
}
func (*V2Receipt) Type() string { return "V2Receipt" }
type V2DeviceMessages struct {
UserID string
DeviceID string
}
func (*V2DeviceMessages) Type() string { return "V2DeviceMessages" }
type V2ExpiredToken struct {
UserID string
DeviceID string
}
func (*V2ExpiredToken) Type() string { return "V2ExpiredToken" }
// V2StateRedaction is emitted when a timeline is seen that contains one or more
// redaction events targeting a piece of room state. The redaction will be emitted
// before its corresponding V2Accumulate payload is emitted.
type V2StateRedaction struct {
RoomID string
}
func (*V2StateRedaction) Type() string { return "V2StateRedaction" }
// V2InvalidateRoom is emitted after a non-incremental state change to a room, in place
// of a V2Initialise payload.
type V2InvalidateRoom struct {
RoomID string
}
func (*V2InvalidateRoom) Type() string { return "V2InvalidateRoom" }
type V2Sub struct {
listener Listener
receiver V2Listener
}
func NewV2Sub(l Listener, recv V2Listener) *V2Sub {
return &V2Sub{
listener: l,
receiver: recv,
}
}
func (v *V2Sub) Teardown() {
v.listener.Close()
}
func (v *V2Sub) onMessage(p Payload) {
switch pl := p.(type) {
case *V2Receipt:
v.receiver.OnReceipt(pl)
case *V2Initialise:
v.receiver.Initialise(pl)
case *V2Accumulate:
v.receiver.Accumulate(pl)
case *V2TransactionID:
v.receiver.OnTransactionID(pl)
case *V2AccountData:
v.receiver.OnAccountData(pl)
case *V2InviteRoom:
v.receiver.OnInvite(pl)
case *V2LeaveRoom:
v.receiver.OnLeftRoom(pl)
case *V2UnreadCounts:
v.receiver.OnUnreadCounts(pl)
case *V2InitialSyncComplete:
v.receiver.OnInitialSyncComplete(pl)
case *V2DeviceData:
v.receiver.OnDeviceData(pl)
case *V2Typing:
v.receiver.OnTyping(pl)
case *V2DeviceMessages:
v.receiver.OnDeviceMessages(pl)
case *V2ExpiredToken:
v.receiver.OnExpiredToken(pl)
case *V2InvalidateRoom:
v.receiver.OnInvalidateRoom(pl)
case *V2StateRedaction:
v.receiver.OnStateRedaction(pl)
default:
logger.Warn().Str("type", p.Type()).Msg("V2Sub: unhandled payload type")
}
}
func (v *V2Sub) Listen() error {
return v.listener.Listen(ChanV2, v.onMessage)
}