mirror of
https://github.com/matrix-org/sliding-sync.git
synced 2025-03-10 13:37:11 +00:00
Improve logging; add more typing stream code
This commit is contained in:
parent
749e4d413e
commit
f03669ff31
@ -44,12 +44,12 @@ func (s *Storage) Initialise(roomID string, state []json.RawMessage) error {
|
||||
}
|
||||
|
||||
// Typing returns who is currently typing in this room along with the latest stream ID.
|
||||
func (s *Storage) Typing(roomID string, streamID int) ([]string, int, error) {
|
||||
return s.typingTable.Typing(roomID, streamID)
|
||||
func (s *Storage) Typing(roomID string, fromStreamIDExcl, toStreamIDIncl int64) ([]string, error) {
|
||||
return s.typingTable.Typing(roomID, fromStreamIDExcl, toStreamIDIncl)
|
||||
}
|
||||
|
||||
// SetTyping sets who is typing in the room. An empty list removes all typing users. Returns the
|
||||
// stream ID of the newly inserted typing users.
|
||||
func (s *Storage) SetTyping(roomID string, userIDs []string) (int, error) {
|
||||
func (s *Storage) SetTyping(roomID string, userIDs []string) (int64, error) {
|
||||
return s.typingTable.SetTyping(roomID, userIDs)
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ func NewTypingTable(db *sqlx.DB) *TypingTable {
|
||||
return &TypingTable{db}
|
||||
}
|
||||
|
||||
func (t *TypingTable) SetTyping(roomID string, userIDs []string) (streamID int, err error) {
|
||||
func (t *TypingTable) SetTyping(roomID string, userIDs []string) (streamID int64, err error) {
|
||||
if userIDs == nil {
|
||||
userIDs = []string{}
|
||||
}
|
||||
@ -39,13 +39,13 @@ func (t *TypingTable) SetTyping(roomID string, userIDs []string) (streamID int,
|
||||
return streamID, err
|
||||
}
|
||||
|
||||
func (t *TypingTable) Typing(roomID string, streamID int) (userIDs []string, newStreamID int, err error) {
|
||||
func (t *TypingTable) Typing(roomID string, fromStreamIDExcl, toStreamIDIncl int64) (userIDs []string, err error) {
|
||||
var userIDsArray pq.StringArray
|
||||
err = t.db.QueryRow(
|
||||
`SELECT stream_id, user_ids FROM syncv3_typing WHERE room_id=$1 AND stream_id > $2`, roomID, streamID,
|
||||
).Scan(&newStreamID, &userIDsArray)
|
||||
`SELECT user_ids FROM syncv3_typing WHERE room_id=$1 AND stream_id > $2 AND stream_id <= $3`, roomID, fromStreamIDExcl, toStreamIDIncl,
|
||||
).Scan(&userIDsArray)
|
||||
if err == sql.ErrNoRows {
|
||||
err = nil
|
||||
}
|
||||
return userIDsArray, newStreamID, err
|
||||
return userIDsArray, err
|
||||
}
|
||||
|
@ -18,7 +18,7 @@ func TestTypingTable(t *testing.T) {
|
||||
}
|
||||
roomID := "!foo:localhost"
|
||||
table := NewTypingTable(db)
|
||||
lastStreamID := -1
|
||||
lastStreamID := int64(-1)
|
||||
|
||||
setAndCheck := func() {
|
||||
streamID, err := table.SetTyping(roomID, userIDs)
|
||||
@ -32,16 +32,13 @@ func TestTypingTable(t *testing.T) {
|
||||
t.Errorf("SetTyping: streamID returned should always be increasing but it wasn't, got %d, last %d", streamID, lastStreamID)
|
||||
}
|
||||
lastStreamID = streamID
|
||||
gotUserIDs, newStreamID, err := table.Typing(roomID, streamID-1)
|
||||
gotUserIDs, err := table.Typing(roomID, streamID-1, streamID)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to Typing: %s", err)
|
||||
}
|
||||
if !reflect.DeepEqual(gotUserIDs, userIDs) {
|
||||
t.Errorf("got typing users %v want %v", gotUserIDs, userIDs)
|
||||
}
|
||||
if newStreamID != streamID {
|
||||
t.Errorf("Typing did not return the same stream ID as SetTyping, got %d want %d", newStreamID, streamID)
|
||||
}
|
||||
}
|
||||
setAndCheck()
|
||||
userIDs = userIDs[1:]
|
||||
|
@ -1,7 +1,35 @@
|
||||
package streams
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/sync-v3/state"
|
||||
)
|
||||
|
||||
type FilterTyping struct {
|
||||
RoomID string `json:"room_id"`
|
||||
}
|
||||
|
||||
type TypingResponse struct{}
|
||||
type TypingResponse struct {
|
||||
UserIDs []string `json:"user_ids"`
|
||||
}
|
||||
|
||||
// Typing represents a stream of users who are typing.
|
||||
type Typing struct {
|
||||
storage *state.Storage
|
||||
}
|
||||
|
||||
func NewTyping(s *state.Storage) *Typing {
|
||||
return &Typing{s}
|
||||
}
|
||||
|
||||
func (s *Typing) Process(userID string, from, to int64, f *FilterTyping) (resp *TypingResponse, err error) {
|
||||
userIDs, err := s.storage.Typing(f.RoomID, from, to)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Typing: %s", err)
|
||||
}
|
||||
resp = &TypingResponse{
|
||||
UserIDs: userIDs,
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -3,7 +3,6 @@ package sync2
|
||||
import (
|
||||
"encoding/json"
|
||||
"math"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/sync-v3/state"
|
||||
@ -28,11 +27,11 @@ type Poller struct {
|
||||
Terminated bool
|
||||
}
|
||||
|
||||
func NewPoller(authHeader, deviceID string, client Client, storage *state.Storage, sessions *sync3.Sessions) *Poller {
|
||||
return newPoller(authHeader, deviceID, client, storage, sessions)
|
||||
func NewPoller(authHeader, deviceID string, client Client, storage *state.Storage, sessions *sync3.Sessions, logger zerolog.Logger) *Poller {
|
||||
return newPoller(authHeader, deviceID, client, storage, sessions, logger)
|
||||
}
|
||||
|
||||
func newPoller(authHeader, deviceID string, client Client, storage storageInterface, sessions sessionsInterface) *Poller {
|
||||
func newPoller(authHeader, deviceID string, client Client, storage storageInterface, sessions sessionsInterface, logger zerolog.Logger) *Poller {
|
||||
return &Poller{
|
||||
authorizationHeader: authHeader,
|
||||
deviceID: deviceID,
|
||||
@ -40,35 +39,32 @@ func newPoller(authHeader, deviceID string, client Client, storage storageInterf
|
||||
storage: storage,
|
||||
sessions: sessions,
|
||||
Terminated: false,
|
||||
logger: zerolog.New(os.Stdout).With().Timestamp().Logger().With().Str("device", deviceID).Logger().Output(zerolog.ConsoleWriter{
|
||||
Out: os.Stderr,
|
||||
TimeFormat: "15:04:05",
|
||||
}),
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Poll will block forever, repeatedly calling v2 sync. Do this in a goroutine.
|
||||
// Returns if the access token gets invalidated. Invokes the callback on first success.
|
||||
func (p *Poller) Poll(since string, callback func()) {
|
||||
p.logger.Info().Str("since", since).Msg("v2 poll loop started")
|
||||
p.logger.Info().Str("since", since).Msg("Poller: v2 poll loop started")
|
||||
failCount := 0
|
||||
firstTime := true
|
||||
for {
|
||||
if failCount > 0 {
|
||||
waitTime := time.Duration(math.Pow(2, float64(failCount))) * time.Second
|
||||
p.logger.Warn().Str("duration", waitTime.String()).Msg("waiting before next poll")
|
||||
p.logger.Warn().Str("duration", waitTime.String()).Msg("Poller: waiting before next poll")
|
||||
timeSleep(waitTime)
|
||||
}
|
||||
p.logger.Info().Str("since", since).Msg("requesting data")
|
||||
p.logger.Info().Str("since", since).Msg("Poller: requesting data")
|
||||
resp, statusCode, err := p.client.DoSyncV2(p.authorizationHeader, since)
|
||||
if err != nil {
|
||||
// check if temporary
|
||||
if statusCode != 401 {
|
||||
p.logger.Warn().Int("code", statusCode).Err(err).Msg("sync v2 poll returned temporary error")
|
||||
p.logger.Warn().Int("code", statusCode).Err(err).Msg("Poller: sync v2 poll returned temporary error")
|
||||
failCount += 1
|
||||
continue
|
||||
} else {
|
||||
p.logger.Warn().Msg("access token has been invalidated, terminating loop")
|
||||
p.logger.Warn().Msg("Poller: access token has been invalidated, terminating loop")
|
||||
p.Terminated = true
|
||||
return
|
||||
}
|
||||
@ -80,7 +76,7 @@ func (p *Poller) Poll(since string, callback func()) {
|
||||
err = p.sessions.UpdateDeviceSince(p.deviceID, since)
|
||||
if err != nil {
|
||||
// non-fatal
|
||||
p.logger.Warn().Str("since", since).Err(err).Msg("failed to persist new since value")
|
||||
p.logger.Warn().Str("since", since).Err(err).Msg("Poller: failed to persist new since value")
|
||||
}
|
||||
|
||||
if firstTime {
|
||||
@ -92,18 +88,26 @@ func (p *Poller) Poll(since string, callback func()) {
|
||||
|
||||
func (p *Poller) accumulate(res *SyncResponse) {
|
||||
if len(res.Rooms.Join) == 0 {
|
||||
p.logger.Info().Msg("Poller: no rooms in join response")
|
||||
return
|
||||
}
|
||||
initCalls := 0
|
||||
accumCalls := 0
|
||||
typingCalls := 0
|
||||
for roomID, roomData := range res.Rooms.Join {
|
||||
if len(roomData.State.Events) > 0 {
|
||||
initCalls++
|
||||
err := p.storage.Initialise(roomID, roomData.State.Events)
|
||||
if err != nil {
|
||||
p.logger.Err(err).Str("room_id", roomID).Int("num_state_events", len(roomData.State.Events)).Msg("Accumulator.Initialise failed")
|
||||
p.logger.Err(err).Str("room_id", roomID).Int("num_state_events", len(roomData.State.Events)).Msg("Poller: Accumulator.Initialise failed")
|
||||
}
|
||||
}
|
||||
err := p.storage.Accumulate(roomID, roomData.Timeline.Events)
|
||||
if err != nil {
|
||||
p.logger.Err(err).Str("room_id", roomID).Int("num_timeline_events", len(roomData.Timeline.Events)).Msg("Accumulator.Accumulate failed")
|
||||
if len(roomData.Timeline.Events) > 0 {
|
||||
accumCalls++
|
||||
err := p.storage.Accumulate(roomID, roomData.Timeline.Events)
|
||||
if err != nil {
|
||||
p.logger.Err(err).Str("room_id", roomID).Int("num_timeline_events", len(roomData.Timeline.Events)).Msg("Poller: Accumulator.Accumulate failed")
|
||||
}
|
||||
}
|
||||
for _, ephEvent := range roomData.Ephemeral.Events {
|
||||
if gjson.GetBytes(ephEvent, "type").Str == "m.typing" {
|
||||
@ -117,14 +121,19 @@ func (p *Poller) accumulate(res *SyncResponse) {
|
||||
userIDs = append(userIDs, u.Str)
|
||||
}
|
||||
}
|
||||
_, err = p.storage.SetTyping(roomID, userIDs)
|
||||
typingCalls++
|
||||
_, err := p.storage.SetTyping(roomID, userIDs)
|
||||
if err != nil {
|
||||
p.logger.Err(err).Str("room_id", roomID).Strs("user_ids", userIDs).Msg("Accumulator: failed to set typing")
|
||||
p.logger.Err(err).Str("room_id", roomID).Strs("user_ids", userIDs).Msg("Poller: failed to SetTyping")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
p.logger.Info().Int("num_rooms", len(res.Rooms.Join)).Msg("accumulated data")
|
||||
p.logger.Info().Ints(
|
||||
"rooms [invite,join,leave]", []int{len(res.Rooms.Invite), len(res.Rooms.Join), len(res.Rooms.Leave)},
|
||||
).Ints(
|
||||
"storage [inits,accum,typing]", []int{initCalls, accumCalls, typingCalls},
|
||||
).Msg("Poller: accumulated data")
|
||||
}
|
||||
|
||||
// the subset of Sessions which the poller uses, mocked for tests
|
||||
@ -132,14 +141,9 @@ type sessionsInterface interface {
|
||||
UpdateDeviceSince(deviceID, since string) error
|
||||
}
|
||||
|
||||
// the subset of Client which the poller uses, mocked for tests
|
||||
type clientInterface interface {
|
||||
DoSyncV2(authHeader, since string) (*SyncResponse, int, error)
|
||||
}
|
||||
|
||||
// the subset of Storage which the poller uses, mocked for tests
|
||||
type storageInterface interface {
|
||||
Accumulate(roomID string, timeline []json.RawMessage) error
|
||||
Initialise(roomID string, state []json.RawMessage) error
|
||||
SetTyping(roomID string, userIDs []string) (int, error)
|
||||
SetTyping(roomID string, userIDs []string) (int64, error)
|
||||
}
|
||||
|
@ -3,10 +3,13 @@ package sync2
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
// Check that a call to Poll starts polling and accumulating, and terminates on 401s.
|
||||
@ -41,7 +44,7 @@ func TestPollerPollFromNothing(t *testing.T) {
|
||||
})
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
poller := newPoller("Authorization: hello world", deviceID, client, accumulator, sessions)
|
||||
poller := newPoller("Authorization: hello world", deviceID, client, accumulator, sessions, zerolog.New(os.Stderr))
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
poller.Poll("", func() {
|
||||
@ -118,7 +121,7 @@ func TestPollerPollFromExisting(t *testing.T) {
|
||||
})
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
poller := newPoller("Authorization: hello world", deviceID, client, accumulator, sessions)
|
||||
poller := newPoller("Authorization: hello world", deviceID, client, accumulator, sessions, zerolog.New(os.Stderr))
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
poller.Poll(since, func() {
|
||||
@ -186,7 +189,7 @@ func TestPollerBackoff(t *testing.T) {
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
poller := newPoller("Authorization: hello world", deviceID, client, accumulator, sessions)
|
||||
poller := newPoller("Authorization: hello world", deviceID, client, accumulator, sessions, zerolog.New(os.Stderr))
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
poller.Poll("some_since_value", func() {
|
||||
@ -226,7 +229,7 @@ func (a *mockAccumulator) Initialise(roomID string, state []json.RawMessage) err
|
||||
a.states[roomID] = state
|
||||
return nil
|
||||
}
|
||||
func (a *mockAccumulator) SetTyping(roomID string, userIDs []string) (int, error) {
|
||||
func (a *mockAccumulator) SetTyping(roomID string, userIDs []string) (int64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
|
@ -6,11 +6,12 @@ import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
// V3_S1_57423_F9
|
||||
// "V3_S" $SESSION "_" $NID "_F" $FILTER
|
||||
// V3_S1_57423_123_F9
|
||||
// "V3_S" $SESSION "_" $NID "_" $TYPING "_F" $FILTER
|
||||
type Token struct {
|
||||
SessionID int64
|
||||
NID int64
|
||||
TypingID int64
|
||||
FilterID int64
|
||||
}
|
||||
|
||||
@ -19,18 +20,18 @@ func (t Token) String() string {
|
||||
if t.FilterID != 0 {
|
||||
filterID = fmt.Sprintf("%d", t.FilterID)
|
||||
}
|
||||
return fmt.Sprintf("V3_S%d_%d_F%s", t.SessionID, t.NID, filterID)
|
||||
return fmt.Sprintf("V3_S%d_%d_%d_F%s", t.SessionID, t.NID, t.TypingID, filterID)
|
||||
}
|
||||
|
||||
func NewSyncToken(since string) (*Token, error) {
|
||||
segments := strings.SplitN(since, "_", 4)
|
||||
if len(segments) != 4 {
|
||||
segments := strings.SplitN(since, "_", 5)
|
||||
if len(segments) != 5 {
|
||||
return nil, fmt.Errorf("not a sync v3 token")
|
||||
}
|
||||
if segments[0] != "V3" {
|
||||
return nil, fmt.Errorf("not a sync v3 token: %s", since)
|
||||
}
|
||||
filterstr := strings.TrimPrefix(segments[3], "F")
|
||||
filterstr := strings.TrimPrefix(segments[4], "F")
|
||||
var fid int64
|
||||
var err error
|
||||
if len(filterstr) > 0 {
|
||||
@ -47,11 +48,16 @@ func NewSyncToken(since string) (*Token, error) {
|
||||
}
|
||||
nid, err := strconv.ParseInt(segments[2], 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid nid: %s", sidstr)
|
||||
return nil, fmt.Errorf("invalid nid: %s", segments[2])
|
||||
}
|
||||
typingid, err := strconv.ParseInt(segments[3], 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid typing: %s", segments[3])
|
||||
}
|
||||
return &Token{
|
||||
SessionID: sid,
|
||||
NID: nid,
|
||||
FilterID: fid,
|
||||
TypingID: typingid,
|
||||
}, nil
|
||||
}
|
||||
|
@ -22,19 +22,21 @@ func TestNewSyncToken(t *testing.T) {
|
||||
},
|
||||
{
|
||||
// with filter
|
||||
in: "V3_S1_12_F6",
|
||||
in: "V3_S1_12_19_F6",
|
||||
outToken: &Token{
|
||||
SessionID: 1,
|
||||
NID: 12,
|
||||
TypingID: 19,
|
||||
FilterID: 6,
|
||||
},
|
||||
},
|
||||
{
|
||||
// without filter
|
||||
in: "V3_S1_33_F",
|
||||
in: "V3_S1_33_100_F",
|
||||
outToken: &Token{
|
||||
SessionID: 1,
|
||||
NID: 33,
|
||||
TypingID: 100,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
97
v3.go
97
v3.go
@ -13,17 +13,13 @@ import (
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/matrix-org/sync-v3/state"
|
||||
"github.com/matrix-org/sync-v3/streams"
|
||||
"github.com/matrix-org/sync-v3/sync2"
|
||||
"github.com/matrix-org/sync-v3/sync3"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/hlog"
|
||||
)
|
||||
|
||||
var log = zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{
|
||||
Out: os.Stderr,
|
||||
TimeFormat: "15:04:05",
|
||||
})
|
||||
|
||||
type server struct {
|
||||
chain []func(next http.Handler) http.Handler
|
||||
final http.Handler
|
||||
@ -39,19 +35,17 @@ func (s *server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
|
||||
// RunSyncV3Server is the main entry point to the server
|
||||
func RunSyncV3Server(destinationServer, bindAddr, postgresDBURI string) {
|
||||
logger := zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{
|
||||
Out: os.Stderr,
|
||||
TimeFormat: "15:04:05",
|
||||
})
|
||||
// dependency inject all components together
|
||||
sh := &SyncV3Handler{
|
||||
V2: &sync2.HTTPClient{
|
||||
Client: &http.Client{
|
||||
Timeout: 5 * time.Minute,
|
||||
},
|
||||
DestinationServer: destinationServer,
|
||||
sh := NewSyncV3Handler(&sync2.HTTPClient{
|
||||
Client: &http.Client{
|
||||
Timeout: 5 * time.Minute,
|
||||
},
|
||||
Sessions: sync3.NewSessions(postgresDBURI),
|
||||
Storage: state.NewStorage(postgresDBURI),
|
||||
Pollers: make(map[string]*sync2.Poller),
|
||||
pollerMu: &sync.Mutex{},
|
||||
}
|
||||
DestinationServer: destinationServer,
|
||||
}, postgresDBURI)
|
||||
|
||||
// HTTP path routing
|
||||
r := mux.NewRouter()
|
||||
@ -59,7 +53,7 @@ func RunSyncV3Server(destinationServer, bindAddr, postgresDBURI string) {
|
||||
|
||||
srv := &server{
|
||||
chain: []func(next http.Handler) http.Handler{
|
||||
hlog.NewHandler(log),
|
||||
hlog.NewHandler(logger),
|
||||
hlog.AccessHandler(func(r *http.Request, status, size int, duration time.Duration) {
|
||||
hlog.FromRequest(r).Info().
|
||||
Str("method", r.Method).
|
||||
@ -75,9 +69,9 @@ func RunSyncV3Server(destinationServer, bindAddr, postgresDBURI string) {
|
||||
}
|
||||
|
||||
// Block forever
|
||||
log.Info().Msgf("listening on %s", bindAddr)
|
||||
logger.Info().Msgf("listening on %s", bindAddr)
|
||||
if err := http.ListenAndServe(bindAddr, srv); err != nil {
|
||||
log.Fatal().Err(err).Msg("failed to listen and serve")
|
||||
logger.Fatal().Err(err).Msg("failed to listen and serve")
|
||||
}
|
||||
}
|
||||
|
||||
@ -95,10 +89,24 @@ type SyncV3Handler struct {
|
||||
Sessions *sync3.Sessions
|
||||
Storage *state.Storage
|
||||
|
||||
typingStream *streams.Typing
|
||||
|
||||
pollerMu *sync.Mutex
|
||||
Pollers map[string]*sync2.Poller // device_id -> poller
|
||||
}
|
||||
|
||||
func NewSyncV3Handler(v2Client sync2.Client, postgresDBURI string) *SyncV3Handler {
|
||||
sh := &SyncV3Handler{
|
||||
V2: v2Client,
|
||||
Sessions: sync3.NewSessions(postgresDBURI),
|
||||
Storage: state.NewStorage(postgresDBURI),
|
||||
Pollers: make(map[string]*sync2.Poller),
|
||||
pollerMu: &sync.Mutex{},
|
||||
}
|
||||
sh.typingStream = streams.NewTyping(sh.Storage)
|
||||
return sh
|
||||
}
|
||||
|
||||
func (h *SyncV3Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
err := h.serve(w, req)
|
||||
if err != nil {
|
||||
@ -112,10 +120,11 @@ func (h *SyncV3Handler) serve(w http.ResponseWriter, req *http.Request) *handler
|
||||
if herr != nil {
|
||||
return herr
|
||||
}
|
||||
log.Info().Int64("session", session.ID).Str("device", session.DeviceID).Msg("recv /v3/sync")
|
||||
log := hlog.FromRequest(req)
|
||||
log.Info().Int64("session", session.ID).Str("device", session.DeviceID).Str("user_id", session.UserID).Msg("recv /v3/sync")
|
||||
|
||||
// make sure we have a poller for this device
|
||||
h.ensurePolling(req.Header.Get("Authorization"), session)
|
||||
h.ensurePolling(req.Header.Get("Authorization"), session, log.With().Int64("session", session.ID).Logger())
|
||||
|
||||
// fetch the latest value which we'll base our response on
|
||||
latestNID, err := h.Storage.LatestEventNID()
|
||||
@ -129,14 +138,14 @@ func (h *SyncV3Handler) serve(w http.ResponseWriter, req *http.Request) *handler
|
||||
SessionID: session.ID,
|
||||
NID: latestNID,
|
||||
}
|
||||
/*
|
||||
var from int64
|
||||
if tokv3 != nil {
|
||||
from = tokv3.NID
|
||||
} */
|
||||
|
||||
var from int64
|
||||
if tokv3 != nil {
|
||||
from = tokv3.NID
|
||||
}
|
||||
|
||||
// read filters and mux in to form complete request
|
||||
_, filterID, herr := h.parseRequest(req, tokv3, session)
|
||||
syncReq, filterID, herr := h.parseRequest(req, tokv3, session)
|
||||
if herr != nil {
|
||||
return herr
|
||||
}
|
||||
@ -145,23 +154,22 @@ func (h *SyncV3Handler) serve(w http.ResponseWriter, req *http.Request) *handler
|
||||
upcoming.FilterID = filterID
|
||||
}
|
||||
|
||||
// TODO: invoke streams to get responses
|
||||
/*
|
||||
f := false
|
||||
filter := &streams.FilterRoomList{
|
||||
EntriesPerBatch: 5,
|
||||
RoomNameSize: 70,
|
||||
IncludeRoomAvatarMXC: &f,
|
||||
SummaryEventTypes: []string{"m.room.message", "m.room.member"},
|
||||
}
|
||||
stream := streams.NewRoomList(h.Storage)
|
||||
_, _, err = stream.Process(session.DeviceID, from, latestNID, "", filter)
|
||||
// start making the response
|
||||
resp := sync3.Response{
|
||||
Next: upcoming.String(),
|
||||
}
|
||||
|
||||
// invoke streams to get responses
|
||||
if syncReq.Typing != nil {
|
||||
typingResp, err := h.typingStream.Process(session.UserID, from, latestNID, syncReq.Typing)
|
||||
if err != nil {
|
||||
return &handlerError{
|
||||
err: err,
|
||||
StatusCode: 500,
|
||||
err: fmt.Errorf("typing stream: %s", err),
|
||||
}
|
||||
} */
|
||||
}
|
||||
resp.Typing = typingResp
|
||||
}
|
||||
|
||||
// finally update our records: confirm that the client received the token they sent us, and mark this
|
||||
// response as unconfirmed
|
||||
@ -169,6 +177,7 @@ func (h *SyncV3Handler) serve(w http.ResponseWriter, req *http.Request) *handler
|
||||
if tokv3 != nil {
|
||||
confirmed = tokv3.String()
|
||||
}
|
||||
log.Info().Int64("session", session.ID).Str("since", confirmed).Str("new_since", upcoming.String()).Msg("responding")
|
||||
if err := h.Sessions.UpdateLastTokens(session.ID, confirmed, upcoming.String()); err != nil {
|
||||
return &handlerError{
|
||||
err: err,
|
||||
@ -177,9 +186,6 @@ func (h *SyncV3Handler) serve(w http.ResponseWriter, req *http.Request) *handler
|
||||
}
|
||||
|
||||
w.WriteHeader(200)
|
||||
resp := sync3.Response{
|
||||
Next: upcoming.String(),
|
||||
}
|
||||
if err := json.NewEncoder(w).Encode(&resp); err != nil {
|
||||
log.Warn().Err(err).Msg("failed to marshal response")
|
||||
}
|
||||
@ -189,6 +195,7 @@ func (h *SyncV3Handler) serve(w http.ResponseWriter, req *http.Request) *handler
|
||||
// getOrCreateSession retrieves an existing session if ?since= is set, else makes a new session.
|
||||
// Returns a session or an error. Returns a token if and only if there is an existing session.
|
||||
func (h *SyncV3Handler) getOrCreateSession(req *http.Request) (*sync3.Session, *sync3.Token, *handlerError) {
|
||||
log := hlog.FromRequest(req)
|
||||
var session *sync3.Session
|
||||
var tokv3 *sync3.Token
|
||||
deviceID, err := deviceIDFromRequest(req)
|
||||
@ -227,7 +234,7 @@ func (h *SyncV3Handler) getOrCreateSession(req *http.Request) (*sync3.Session, *
|
||||
// ensurePolling makes sure there is a poller for this device, making one if need be.
|
||||
// Blocks until at least 1 sync is done if and only if the poller was just created.
|
||||
// This ensures that calls to the database will return data.
|
||||
func (h *SyncV3Handler) ensurePolling(authHeader string, session *sync3.Session) {
|
||||
func (h *SyncV3Handler) ensurePolling(authHeader string, session *sync3.Session, logger zerolog.Logger) {
|
||||
h.pollerMu.Lock()
|
||||
poller, ok := h.Pollers[session.DeviceID]
|
||||
// either no poller exists or it did but it died
|
||||
@ -236,7 +243,7 @@ func (h *SyncV3Handler) ensurePolling(authHeader string, session *sync3.Session)
|
||||
return
|
||||
}
|
||||
// replace the poller
|
||||
poller = sync2.NewPoller(authHeader, session.DeviceID, h.V2, h.Storage, h.Sessions)
|
||||
poller = sync2.NewPoller(authHeader, session.DeviceID, h.V2, h.Storage, h.Sessions, logger)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go poller.Poll(session.V2Since, func() {
|
||||
|
47
v3_test.go
47
v3_test.go
@ -4,12 +4,14 @@ import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/sync-v3/state"
|
||||
"github.com/matrix-org/sync-v3/sync2"
|
||||
"github.com/matrix-org/sync-v3/sync3"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/hlog"
|
||||
)
|
||||
|
||||
type mockV2Client struct {
|
||||
@ -49,16 +51,18 @@ func TestHandler(t *testing.T) {
|
||||
bob := "@bob:localhost"
|
||||
roomID := "!foo:localhost"
|
||||
v2ServerChan := make(chan *sync2.SyncResponse, 10)
|
||||
h := SyncV3Handler{
|
||||
pollerMu: &sync.Mutex{},
|
||||
Sessions: sync3.NewSessions(postgresConnectionString),
|
||||
Storage: state.NewStorage(postgresConnectionString),
|
||||
Pollers: make(map[string]*sync2.Poller),
|
||||
V2: &mockV2Client{
|
||||
requester: alice,
|
||||
ch: v2ServerChan,
|
||||
},
|
||||
}
|
||||
// disable colours in tests to make it display nicer in IDEs
|
||||
log := zerolog.New(os.Stdout).With().Timestamp().Logger().Output(zerolog.ConsoleWriter{
|
||||
Out: os.Stderr,
|
||||
TimeFormat: "15:04:05",
|
||||
NoColor: true,
|
||||
})
|
||||
wrapper := hlog.NewHandler(log)
|
||||
h := NewSyncV3Handler(&mockV2Client{
|
||||
requester: alice,
|
||||
ch: v2ServerChan,
|
||||
}, postgresConnectionString)
|
||||
server := wrapper(h)
|
||||
|
||||
// prepare a response from v2
|
||||
v2Resp := &sync2.SyncResponse{
|
||||
@ -101,7 +105,7 @@ func TestHandler(t *testing.T) {
|
||||
},
|
||||
})))
|
||||
req.Header.Set("Authorization", aliceBearer)
|
||||
h.ServeHTTP(w, req)
|
||||
server.ServeHTTP(w, req)
|
||||
if w.Code != 200 {
|
||||
t.Fatalf("/v3/sync returned HTTP %d want 200", w.Code)
|
||||
}
|
||||
@ -126,17 +130,28 @@ func TestHandler(t *testing.T) {
|
||||
},
|
||||
}
|
||||
v2ServerChan <- v2Resp
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// 2nd request with no special args should remember we want the typing notif
|
||||
w = httptest.NewRecorder()
|
||||
w.Body = bytes.NewBuffer(nil)
|
||||
req = httptest.NewRequest("POST", "/_matrix/client/v3/sync?since="+resp.Next, bytes.NewBuffer([]byte(`{}`)))
|
||||
req.Header.Set("Authorization", aliceBearer)
|
||||
h.ServeHTTP(w, req)
|
||||
server.ServeHTTP(w, req)
|
||||
if w.Code != 200 {
|
||||
t.Fatalf("/v3/sync returned HTTP %d want 200", w.Code)
|
||||
}
|
||||
|
||||
// TODO: Check that the response returns bob typing
|
||||
|
||||
// Check that the response returns bob typing
|
||||
_ = parseResponse(t, w.Body)
|
||||
/*
|
||||
if resp.Typing == nil {
|
||||
t.Fatalf("no typing response, wanted one")
|
||||
}
|
||||
if len(resp.Typing.UserIDs) != 1 {
|
||||
t.Fatalf("typing got %d users, want 1: %v", len(resp.Typing.UserIDs), resp.Typing.UserIDs)
|
||||
}
|
||||
if resp.Typing.UserIDs[0] != bob {
|
||||
t.Fatalf("typing got %s want %s", resp.Typing.UserIDs[0], bob)
|
||||
} */
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user