Skip to content

Commit 33dfd45

Browse files
committed
fix possible fail to dispose and re-lock
1 parent a8e409b commit 33dfd45

File tree

1 file changed

+25
-15
lines changed

1 file changed

+25
-15
lines changed

RxSwift/Observables/Merge.swift

+25-15
Original file line numberDiff line numberDiff line change
@@ -169,20 +169,7 @@ private final class MergeLimitedSinkIter<SourceElement, SourceSequence: Observab
169169
self.parent.dispose()
170170
case .completed:
171171
self.parent.group.remove(for: self.disposeKey)
172-
if let next = self.parent.queue.dequeue() {
173-
_ = CurrentThreadScheduler.instance.schedule(()) { _ in
174-
self.parent.subscribe(next, group: self.parent.group)
175-
return self.parent.group
176-
}
177-
}
178-
else {
179-
self.parent.activeCount -= 1
180-
181-
if self.parent.stopped && self.parent.activeCount == 0 {
182-
self.parent.forwardOn(.completed)
183-
self.parent.dispose()
184-
}
185-
}
172+
self.parent.dequeueNextAndSubscribe()
186173
}
187174
}
188175
}
@@ -239,7 +226,8 @@ private class MergeLimitedSink<SourceElement, SourceSequence: ObservableConverti
239226
return self.group
240227
}
241228

242-
func subscribe(_ innerSource: SourceSequence, group: CompositeDisposable) {
229+
@discardableResult
230+
func subscribe(_ innerSource: SourceSequence, group: CompositeDisposable) -> Disposable {
243231
let subscription = SingleAssignmentDisposable()
244232

245233
let key = group.insert(subscription)
@@ -250,6 +238,28 @@ private class MergeLimitedSink<SourceElement, SourceSequence: ObservableConverti
250238
let disposable = innerSource.asObservable().subscribe(observer)
251239
subscription.setDisposable(disposable)
252240
}
241+
return subscription
242+
}
243+
244+
func dequeueNextAndSubscribe() {
245+
if let next = queue.dequeue() {
246+
// subscribing immediately can produce values immediately which can re-enter and cause stack overflows
247+
let disposable = CurrentThreadScheduler.instance.schedule(()) { _ in
248+
// lock again
249+
self.lock.performLocked {
250+
self.subscribe(next, group: self.group)
251+
}
252+
}
253+
_ = group.insert(disposable)
254+
}
255+
else {
256+
activeCount -= 1
257+
258+
if stopped && activeCount == 0 {
259+
forwardOn(.completed)
260+
dispose()
261+
}
262+
}
253263
}
254264

255265
func performMap(_ element: SourceElement) throws -> SourceSequence {

0 commit comments

Comments
 (0)