Bump the SDK and handle breaking changes on timeline diffing

- avoid race conditions between when a timeline listener starts publishing to the `updatePublisher` and when they are consumed (and in which order)
- subscribe to the timeline directly in the TimelineProvider and correctly process updates serially
- update the AutoUpdatingRoomTimelineProviderMock to use an SDK TimlineMock and listener to publish updates
This commit is contained in:
Stefan Ceriu 2024-06-13 12:37:49 +03:00 committed by Stefan Ceriu
parent b73e5e9e5d
commit 8b3141ffec
7 changed files with 292 additions and 77 deletions

View File

@ -7364,7 +7364,7 @@
repositoryURL = "https://github.com/element-hq/matrix-rust-components-swift"; repositoryURL = "https://github.com/element-hq/matrix-rust-components-swift";
requirement = { requirement = {
kind = exactVersion; kind = exactVersion;
version = 1.0.11; version = 1.0.13;
}; };
}; };
701C7BEF8F70F7A83E852DCC /* XCRemoteSwiftPackageReference "GZIP" */ = { 701C7BEF8F70F7A83E852DCC /* XCRemoteSwiftPackageReference "GZIP" */ = {

View File

@ -148,8 +148,8 @@
"kind" : "remoteSourceControl", "kind" : "remoteSourceControl",
"location" : "https://github.com/element-hq/matrix-rust-components-swift", "location" : "https://github.com/element-hq/matrix-rust-components-swift",
"state" : { "state" : {
"revision" : "e1aa005300f7c5d38ac8203a1668df99fff21bd4", "revision" : "94373ae0568cf1681c39b1a348601e733d76a7ec",
"version" : "1.0.11" "version" : "1.0.13"
} }
}, },
{ {

View File

@ -10248,6 +10248,46 @@ open class RoomSDKMock: MatrixRustSDK.Room {
} }
} }
//MARK: - clearComposerDraft
open var clearComposerDraftThrowableError: Error?
var clearComposerDraftUnderlyingCallsCount = 0
open var clearComposerDraftCallsCount: Int {
get {
if Thread.isMainThread {
return clearComposerDraftUnderlyingCallsCount
} else {
var returnValue: Int? = nil
DispatchQueue.main.sync {
returnValue = clearComposerDraftUnderlyingCallsCount
}
return returnValue!
}
}
set {
if Thread.isMainThread {
clearComposerDraftUnderlyingCallsCount = newValue
} else {
DispatchQueue.main.sync {
clearComposerDraftUnderlyingCallsCount = newValue
}
}
}
}
open var clearComposerDraftCalled: Bool {
return clearComposerDraftCallsCount > 0
}
open var clearComposerDraftClosure: (() async throws -> Void)?
open override func clearComposerDraft() async throws {
if let error = clearComposerDraftThrowableError {
throw error
}
clearComposerDraftCallsCount += 1
try await clearComposerDraftClosure?()
}
//MARK: - discardRoomKey //MARK: - discardRoomKey
open var discardRoomKeyThrowableError: Error? open var discardRoomKeyThrowableError: Error?
@ -11393,6 +11433,75 @@ open class RoomSDKMock: MatrixRustSDK.Room {
try await leaveClosure?() try await leaveClosure?()
} }
//MARK: - loadComposerDraft
open var loadComposerDraftThrowableError: Error?
var loadComposerDraftUnderlyingCallsCount = 0
open var loadComposerDraftCallsCount: Int {
get {
if Thread.isMainThread {
return loadComposerDraftUnderlyingCallsCount
} else {
var returnValue: Int? = nil
DispatchQueue.main.sync {
returnValue = loadComposerDraftUnderlyingCallsCount
}
return returnValue!
}
}
set {
if Thread.isMainThread {
loadComposerDraftUnderlyingCallsCount = newValue
} else {
DispatchQueue.main.sync {
loadComposerDraftUnderlyingCallsCount = newValue
}
}
}
}
open var loadComposerDraftCalled: Bool {
return loadComposerDraftCallsCount > 0
}
var loadComposerDraftUnderlyingReturnValue: ComposerDraft?
open var loadComposerDraftReturnValue: ComposerDraft? {
get {
if Thread.isMainThread {
return loadComposerDraftUnderlyingReturnValue
} else {
var returnValue: ComposerDraft?? = nil
DispatchQueue.main.sync {
returnValue = loadComposerDraftUnderlyingReturnValue
}
return returnValue!
}
}
set {
if Thread.isMainThread {
loadComposerDraftUnderlyingReturnValue = newValue
} else {
DispatchQueue.main.sync {
loadComposerDraftUnderlyingReturnValue = newValue
}
}
}
}
open var loadComposerDraftClosure: (() async throws -> ComposerDraft?)?
open override func loadComposerDraft() async throws -> ComposerDraft? {
if let error = loadComposerDraftThrowableError {
throw error
}
loadComposerDraftCallsCount += 1
if let loadComposerDraftClosure = loadComposerDraftClosure {
return try await loadComposerDraftClosure()
} else {
return loadComposerDraftReturnValue
}
}
//MARK: - markAsRead //MARK: - markAsRead
open var markAsReadReceiptTypeThrowableError: Error? open var markAsReadReceiptTypeThrowableError: Error?
@ -12397,6 +12506,50 @@ open class RoomSDKMock: MatrixRustSDK.Room {
} }
} }
//MARK: - saveComposerDraft
open var saveComposerDraftDraftThrowableError: Error?
var saveComposerDraftDraftUnderlyingCallsCount = 0
open var saveComposerDraftDraftCallsCount: Int {
get {
if Thread.isMainThread {
return saveComposerDraftDraftUnderlyingCallsCount
} else {
var returnValue: Int? = nil
DispatchQueue.main.sync {
returnValue = saveComposerDraftDraftUnderlyingCallsCount
}
return returnValue!
}
}
set {
if Thread.isMainThread {
saveComposerDraftDraftUnderlyingCallsCount = newValue
} else {
DispatchQueue.main.sync {
saveComposerDraftDraftUnderlyingCallsCount = newValue
}
}
}
}
open var saveComposerDraftDraftCalled: Bool {
return saveComposerDraftDraftCallsCount > 0
}
open var saveComposerDraftDraftReceivedDraft: ComposerDraft?
open var saveComposerDraftDraftReceivedInvocations: [ComposerDraft] = []
open var saveComposerDraftDraftClosure: ((ComposerDraft) async throws -> Void)?
open override func saveComposerDraft(draft: ComposerDraft) async throws {
if let error = saveComposerDraftDraftThrowableError {
throw error
}
saveComposerDraftDraftCallsCount += 1
saveComposerDraftDraftReceivedDraft = draft
saveComposerDraftDraftReceivedInvocations.append(draft)
try await saveComposerDraftDraftClosure?(draft)
}
//MARK: - sendCallNotification //MARK: - sendCallNotification
open var sendCallNotificationCallIdApplicationNotifyTypeMentionsThrowableError: Error? open var sendCallNotificationCallIdApplicationNotifyTypeMentionsThrowableError: Error?
@ -16621,13 +16774,13 @@ open class TimelineSDKMock: MatrixRustSDK.Timeline {
open var addListenerListenerReceivedListener: TimelineListener? open var addListenerListenerReceivedListener: TimelineListener?
open var addListenerListenerReceivedInvocations: [TimelineListener] = [] open var addListenerListenerReceivedInvocations: [TimelineListener] = []
var addListenerListenerUnderlyingReturnValue: RoomTimelineListenerResult! var addListenerListenerUnderlyingReturnValue: TaskHandle!
open var addListenerListenerReturnValue: RoomTimelineListenerResult! { open var addListenerListenerReturnValue: TaskHandle! {
get { get {
if Thread.isMainThread { if Thread.isMainThread {
return addListenerListenerUnderlyingReturnValue return addListenerListenerUnderlyingReturnValue
} else { } else {
var returnValue: RoomTimelineListenerResult? = nil var returnValue: TaskHandle? = nil
DispatchQueue.main.sync { DispatchQueue.main.sync {
returnValue = addListenerListenerUnderlyingReturnValue returnValue = addListenerListenerUnderlyingReturnValue
} }
@ -16645,9 +16798,9 @@ open class TimelineSDKMock: MatrixRustSDK.Timeline {
} }
} }
} }
open var addListenerListenerClosure: ((TimelineListener) async -> RoomTimelineListenerResult)? open var addListenerListenerClosure: ((TimelineListener) async -> TaskHandle)?
open override func addListener(listener: TimelineListener) async -> RoomTimelineListenerResult { open override func addListener(listener: TimelineListener) async -> TaskHandle {
addListenerListenerCallsCount += 1 addListenerListenerCallsCount += 1
addListenerListenerReceivedListener = listener addListenerListenerReceivedListener = listener
addListenerListenerReceivedInvocations.append(listener) addListenerListenerReceivedInvocations.append(listener)
@ -17198,6 +17351,79 @@ open class TimelineSDKMock: MatrixRustSDK.Timeline {
} }
} }
//MARK: - loadReplyDetails
open var loadReplyDetailsEventIdStrThrowableError: Error?
var loadReplyDetailsEventIdStrUnderlyingCallsCount = 0
open var loadReplyDetailsEventIdStrCallsCount: Int {
get {
if Thread.isMainThread {
return loadReplyDetailsEventIdStrUnderlyingCallsCount
} else {
var returnValue: Int? = nil
DispatchQueue.main.sync {
returnValue = loadReplyDetailsEventIdStrUnderlyingCallsCount
}
return returnValue!
}
}
set {
if Thread.isMainThread {
loadReplyDetailsEventIdStrUnderlyingCallsCount = newValue
} else {
DispatchQueue.main.sync {
loadReplyDetailsEventIdStrUnderlyingCallsCount = newValue
}
}
}
}
open var loadReplyDetailsEventIdStrCalled: Bool {
return loadReplyDetailsEventIdStrCallsCount > 0
}
open var loadReplyDetailsEventIdStrReceivedEventIdStr: String?
open var loadReplyDetailsEventIdStrReceivedInvocations: [String] = []
var loadReplyDetailsEventIdStrUnderlyingReturnValue: InReplyToDetails!
open var loadReplyDetailsEventIdStrReturnValue: InReplyToDetails! {
get {
if Thread.isMainThread {
return loadReplyDetailsEventIdStrUnderlyingReturnValue
} else {
var returnValue: InReplyToDetails? = nil
DispatchQueue.main.sync {
returnValue = loadReplyDetailsEventIdStrUnderlyingReturnValue
}
return returnValue!
}
}
set {
if Thread.isMainThread {
loadReplyDetailsEventIdStrUnderlyingReturnValue = newValue
} else {
DispatchQueue.main.sync {
loadReplyDetailsEventIdStrUnderlyingReturnValue = newValue
}
}
}
}
open var loadReplyDetailsEventIdStrClosure: ((String) async throws -> InReplyToDetails)?
open override func loadReplyDetails(eventIdStr: String) async throws -> InReplyToDetails {
if let error = loadReplyDetailsEventIdStrThrowableError {
throw error
}
loadReplyDetailsEventIdStrCallsCount += 1
loadReplyDetailsEventIdStrReceivedEventIdStr = eventIdStr
loadReplyDetailsEventIdStrReceivedInvocations.append(eventIdStr)
if let loadReplyDetailsEventIdStrClosure = loadReplyDetailsEventIdStrClosure {
return try await loadReplyDetailsEventIdStrClosure(eventIdStr)
} else {
return loadReplyDetailsEventIdStrReturnValue
}
}
//MARK: - markAsRead //MARK: - markAsRead
open var markAsReadReceiptTypeThrowableError: Error? open var markAsReadReceiptTypeThrowableError: Error?

View File

@ -20,20 +20,25 @@ import MatrixRustSDK
@MainActor @MainActor
class AutoUpdatingRoomTimelineProviderMock: RoomTimelineProvider { class AutoUpdatingRoomTimelineProviderMock: RoomTimelineProvider {
private let innerUpdatePublisher: PassthroughSubject<[TimelineDiff], Never> static var timelineListener: TimelineListener?
private let innerPaginationStatePublisher: PassthroughSubject<PaginationState, Never> private let innerPaginationStatePublisher: PassthroughSubject<PaginationState, Never>
private let innerItems: [TimelineItemProxy] = []
init() { init() {
innerUpdatePublisher = .init()
innerPaginationStatePublisher = .init() innerPaginationStatePublisher = .init()
super.init(currentItems: [], let timelineMock = TimelineSDKMock()
timelineMock.addListenerListenerClosure = { listener in
Self.timelineListener = listener
return TaskHandleSDKMock()
}
super.init(timeline: timelineMock,
isLive: true, isLive: true,
updatePublisher: innerUpdatePublisher.eraseToAnyPublisher(),
paginationStatePublisher: innerPaginationStatePublisher.eraseToAnyPublisher()) paginationStatePublisher: innerPaginationStatePublisher.eraseToAnyPublisher())
Task.detached { [weak self] in Task.detached {
for _ in 0...100 { for _ in 0...100 {
try? await Task.sleep(for: .seconds(1)) try? await Task.sleep(for: .seconds(1))
@ -41,7 +46,7 @@ class AutoUpdatingRoomTimelineProviderMock: RoomTimelineProvider {
diff.changeReturnValue = .append diff.changeReturnValue = .append
diff.appendReturnValue = [TimelineItemFixtures.messageTimelineItem] diff.appendReturnValue = [TimelineItemFixtures.messageTimelineItem]
self?.innerUpdatePublisher.send([diff]) await Self.timelineListener?.onUpdate(diff: [diff])
} }
} }
} }

View File

@ -21,6 +21,8 @@ import MatrixRustSDK
class RoomTimelineProvider: RoomTimelineProviderProtocol { class RoomTimelineProvider: RoomTimelineProviderProtocol {
private var cancellables = Set<AnyCancellable>() private var cancellables = Set<AnyCancellable>()
private let serialDispatchQueue: DispatchQueue private let serialDispatchQueue: DispatchQueue
private var roomTimelineObservationToken: TaskHandle?
private let paginationStateSubject = CurrentValueSubject<PaginationState, Never>(.default) private let paginationStateSubject = CurrentValueSubject<PaginationState, Never>(.default)
var paginationState: PaginationState { var paginationState: PaginationState {
@ -28,8 +30,10 @@ class RoomTimelineProvider: RoomTimelineProviderProtocol {
} }
private let itemProxiesSubject: CurrentValueSubject<[TimelineItemProxy], Never> private let itemProxiesSubject: CurrentValueSubject<[TimelineItemProxy], Never>
var itemProxies: [TimelineItemProxy] { private(set) var itemProxies: [TimelineItemProxy] = [] {
itemProxiesSubject.value didSet {
itemProxiesSubject.send(itemProxies)
}
} }
var updatePublisher: AnyPublisher<([TimelineItemProxy], PaginationState), Never> { var updatePublisher: AnyPublisher<([TimelineItemProxy], PaginationState), Never> {
@ -45,28 +49,29 @@ class RoomTimelineProvider: RoomTimelineProviderProtocol {
membershipChangeSubject membershipChangeSubject
.eraseToAnyPublisher() .eraseToAnyPublisher()
} }
deinit {
roomTimelineObservationToken?.cancel()
}
init(currentItems: [TimelineItem], init(timeline: Timeline, isLive: Bool, paginationStatePublisher: AnyPublisher<PaginationState, Never>) {
isLive: Bool,
updatePublisher: AnyPublisher<[TimelineDiff], Never>,
paginationStatePublisher: AnyPublisher<PaginationState, Never>) {
serialDispatchQueue = DispatchQueue(label: "io.element.elementx.roomtimelineprovider", qos: .utility) serialDispatchQueue = DispatchQueue(label: "io.element.elementx.roomtimelineprovider", qos: .utility)
itemProxiesSubject = CurrentValueSubject<[TimelineItemProxy], Never>(currentItems.map(TimelineItemProxy.init)) itemProxiesSubject = CurrentValueSubject<[TimelineItemProxy], Never>([])
self.isLive = isLive self.isLive = isLive
// Manually call it here as the didSet doesn't work from constructors
itemProxiesSubject.send(itemProxies)
updatePublisher
.receive(on: serialDispatchQueue)
.sink { [weak self] in self?.updateItemsWithDiffs($0) }
.store(in: &cancellables)
paginationStatePublisher paginationStatePublisher
.sink { [weak self] in .sink { [weak self] in
self?.paginationStateSubject.send($0) self?.paginationStateSubject.send($0)
} }
.store(in: &cancellables) .store(in: &cancellables)
Task {
roomTimelineObservationToken = await timeline.addListener(listener: RoomTimelineListener { [weak self] timelineDiffs in
self?.serialDispatchQueue.sync {
self?.updateItemsWithDiffs(timelineDiffs)
}
})
}
} }
// MARK: - Private // MARK: - Private
@ -80,22 +85,19 @@ class RoomTimelineProvider: RoomTimelineProviderProtocol {
MXLog.verbose("Received timeline diff") MXLog.verbose("Received timeline diff")
let items = diffs itemProxies = diffs.reduce(itemProxies) { currentItems, diff in
.reduce(itemProxies) { currentItems, diff in guard let collectionDiff = buildDiff(from: diff, on: currentItems) else {
guard let collectionDiff = buildDiff(from: diff, on: currentItems) else { MXLog.error("Failed building CollectionDifference from \(diff)")
MXLog.error("Failed building CollectionDifference from \(diff)") return currentItems
return currentItems
}
guard let updatedItems = currentItems.applying(collectionDiff) else {
MXLog.error("Failed applying diff: \(collectionDiff)")
return currentItems
}
return updatedItems
} }
itemProxiesSubject.send(items) guard let updatedItems = currentItems.applying(collectionDiff) else {
MXLog.error("Failed applying diff: \(collectionDiff)")
return currentItems
}
return updatedItems
}
MXLog.verbose("Finished applying diffs, current items (\(itemProxies.count)) : \(itemProxies.map(\.debugIdentifier))") MXLog.verbose("Finished applying diffs, current items (\(itemProxies.count)) : \(itemProxies.map(\.debugIdentifier))")
} }
@ -251,3 +253,15 @@ enum DebugIdentifier {
case virtual(timelineID: String, dscription: String) case virtual(timelineID: String, dscription: String)
case unknown(timelineID: String) case unknown(timelineID: String)
} }
private final class RoomTimelineListener: TimelineListener {
private let onUpdateClosure: ([TimelineDiff]) -> Void
init(_ onUpdateClosure: @escaping ([TimelineDiff]) -> Void) {
self.onUpdateClosure = onUpdateClosure
}
func onUpdate(diff: [TimelineDiff]) {
onUpdateClosure(diff)
}
}

View File

@ -22,14 +22,9 @@ final class TimelineProxy: TimelineProxyProtocol {
private let timeline: Timeline private let timeline: Timeline
private var backPaginationStatusObservationToken: TaskHandle? private var backPaginationStatusObservationToken: TaskHandle?
private var roomTimelineObservationToken: TaskHandle?
// periphery:ignore - retaining purpose
private var timelineListener: RoomTimelineListener?
private let backPaginationSubscriptionSubject = CurrentValueSubject<PaginationStatus, Never>(.idle) private let backPaginationSubscriptionSubject = CurrentValueSubject<PaginationStatus, Never>(.idle)
private let forwardPaginationStatusSubject = CurrentValueSubject<PaginationStatus, Never>(.timelineEndReached) private let forwardPaginationStatusSubject = CurrentValueSubject<PaginationStatus, Never>(.timelineEndReached)
private let timelineUpdatesSubject = PassthroughSubject<[TimelineDiff], Never>()
let isLive: Bool let isLive: Bool
@ -40,7 +35,6 @@ final class TimelineProxy: TimelineProxyProtocol {
deinit { deinit {
backPaginationStatusObservationToken?.cancel() backPaginationStatusObservationToken?.cancel()
roomTimelineObservationToken?.cancel()
} }
init(timeline: Timeline, isLive: Bool) { init(timeline: Timeline, isLive: Bool) {
@ -54,15 +48,6 @@ final class TimelineProxy: TimelineProxyProtocol {
return return
} }
let timelineListener = RoomTimelineListener { [weak self] timelineDiffs in
self?.timelineUpdatesSubject.send(timelineDiffs)
}
self.timelineListener = timelineListener
let result = await timeline.addListener(listener: timelineListener)
roomTimelineObservationToken = result.itemsStream
let paginationStatePublisher = backPaginationSubscriptionSubject let paginationStatePublisher = backPaginationSubscriptionSubject
.combineLatest(forwardPaginationStatusSubject) .combineLatest(forwardPaginationStatusSubject)
.map { PaginationState(backward: $0.0, forward: $0.1) } .map { PaginationState(backward: $0.0, forward: $0.1) }
@ -70,10 +55,7 @@ final class TimelineProxy: TimelineProxyProtocol {
await subscribeToPagination() await subscribeToPagination()
innerTimelineProvider = await RoomTimelineProvider(currentItems: result.items, innerTimelineProvider = await RoomTimelineProvider(timeline: timeline, isLive: isLive, paginationStatePublisher: paginationStatePublisher)
isLive: isLive,
updatePublisher: timelineUpdatesSubject.eraseToAnyPublisher(),
paginationStatePublisher: paginationStatePublisher)
} }
func fetchDetails(for eventID: String) { func fetchDetails(for eventID: String) {
@ -533,18 +515,6 @@ final class TimelineProxy: TimelineProxyProtocol {
} }
} }
private final class RoomTimelineListener: TimelineListener {
private let onUpdateClosure: ([TimelineDiff]) -> Void
init(_ onUpdateClosure: @escaping ([TimelineDiff]) -> Void) {
self.onUpdateClosure = onUpdateClosure
}
func onUpdate(diff: [TimelineDiff]) {
onUpdateClosure(diff)
}
}
private final class RoomPaginationStatusListener: PaginationStatusListener { private final class RoomPaginationStatusListener: PaginationStatusListener {
private let onUpdateClosure: (LiveBackPaginationStatus) -> Void private let onUpdateClosure: (LiveBackPaginationStatus) -> Void

View File

@ -49,7 +49,7 @@ packages:
# Element/Matrix dependencies # Element/Matrix dependencies
MatrixRustSDK: MatrixRustSDK:
url: https://github.com/element-hq/matrix-rust-components-swift url: https://github.com/element-hq/matrix-rust-components-swift
exactVersion: 1.0.11 exactVersion: 1.0.13
# path: ../matrix-rust-sdk # path: ../matrix-rust-sdk
Compound: Compound:
url: https://github.com/element-hq/compound-ios url: https://github.com/element-hq/compound-ios