Skip to content

Commit f8c0304

Browse files
authored
Properly cleanup thread locals for non-CoroutineDispatcher-intercepte… (#4303)
* Properly cleanup thread locals for non-CoroutineDispatcher-intercepted continuations There was one codepath not covered by undispatched thread local cleanup procedure: when a custom ContinuationInterceptor is used and the scoped coroutine (i.e. withContext) is completed in-place without suspensions. Fixed with the introduction of the corresponding machinery for ScopeCoroutine Fixes #4296
1 parent 2cafea4 commit f8c0304

File tree

5 files changed

+133
-6
lines changed

5 files changed

+133
-6
lines changed

kotlinx-coroutines-core/common/src/AbstractCoroutine.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package kotlinx.coroutines
55
import kotlinx.coroutines.CoroutineStart.*
66
import kotlinx.coroutines.intrinsics.*
77
import kotlin.coroutines.*
8+
import kotlinx.coroutines.internal.ScopeCoroutine
89

910
/**
1011
* Abstract base class for implementation of coroutines in coroutine builders.
@@ -100,6 +101,15 @@ public abstract class AbstractCoroutine<in T>(
100101
afterResume(state)
101102
}
102103

104+
/**
105+
* Invoked when the corresponding `AbstractCoroutine` was **conceptually** resumed, but not mechanically.
106+
* Currently, this function only invokes `resume` on the underlying continuation for [ScopeCoroutine]
107+
* or does nothing otherwise.
108+
*
109+
* Examples of resumes:
110+
* - `afterCompletion` calls when the corresponding `Job` changed its state (i.e. got cancelled)
111+
* - [AbstractCoroutine.resumeWith] was invoked
112+
*/
103113
protected open fun afterResume(state: Any?): Unit = afterCompletion(state)
104114

105115
internal final override fun handleOnCompletionException(exception: Throwable) {

kotlinx-coroutines-core/common/src/internal/Scopes.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@ internal open class ScopeCoroutine<in T>(
2323
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
2424
}
2525

26+
/**
27+
* Invoked when a scoped coorutine was completed in an undispatched manner directly
28+
* at the place of its start because it never suspended.
29+
*/
30+
open fun afterCompletionUndispatched() {
31+
}
32+
2633
override fun afterResume(state: Any?) {
2734
// Resume direct because scope is already in the correct context
2835
uCont.resumeWith(recoverResult(state, uCont))

kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ private inline fun <T> ScopeCoroutine<T>.undispatchedResult(
7979
if (result === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED // (1)
8080
val state = makeCompletingOnce(result)
8181
if (state === COMPLETING_WAITING_CHILDREN) return COROUTINE_SUSPENDED // (2)
82+
afterCompletionUndispatched()
8283
return if (state is CompletedExceptionally) { // (3)
8384
when {
8485
shouldThrow(state.cause) -> throw recoverStackTrace(state.cause, uCont)

kotlinx-coroutines-core/jvm/src/CoroutineContext.kt

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ internal actual class UndispatchedCoroutine<in T>actual constructor (
185185
* `withContext` for the sake of logging, MDC, tracing etc., meaning that there exists thousands of
186186
* undispatched coroutines.
187187
* Each access to Java's [ThreadLocal] leaves a footprint in the corresponding Thread's `ThreadLocalMap`
188-
* that is cleared automatically as soon as the associated thread-local (-> UndispatchedCoroutine) is garbage collected.
188+
* that is cleared automatically as soon as the associated thread-local (-> UndispatchedCoroutine) is garbage collected
189+
* when either the corresponding thread is GC'ed or it cleans up its stale entries on other TL accesses.
189190
* When such coroutines are promoted to old generation, `ThreadLocalMap`s become bloated and an arbitrary accesses to thread locals
190191
* start to consume significant amount of CPU because these maps are open-addressed and cleaned up incrementally on each access.
191192
* (You can read more about this effect as "GC nepotism").
@@ -253,18 +254,26 @@ internal actual class UndispatchedCoroutine<in T>actual constructor (
253254
}
254255
}
255256

257+
override fun afterCompletionUndispatched() {
258+
clearThreadLocal()
259+
}
260+
256261
override fun afterResume(state: Any?) {
262+
clearThreadLocal()
263+
// resume undispatched -- update context but stay on the same dispatcher
264+
val result = recoverResult(state, uCont)
265+
withContinuationContext(uCont, null) {
266+
uCont.resumeWith(result)
267+
}
268+
}
269+
270+
private fun clearThreadLocal() {
257271
if (threadLocalIsSet) {
258272
threadStateToRecover.get()?.let { (ctx, value) ->
259273
restoreThreadContext(ctx, value)
260274
}
261275
threadStateToRecover.remove()
262276
}
263-
// resume undispatched -- update context but stay on the same dispatcher
264-
val result = recoverResult(state, uCont)
265-
withContinuationContext(uCont, null) {
266-
uCont.resumeWith(result)
267-
}
268277
}
269278
}
270279

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package kotlinx.coroutines
2+
3+
import kotlinx.coroutines.testing.TestBase
4+
import java.lang.ref.WeakReference
5+
import kotlin.coroutines.AbstractCoroutineContextElement
6+
import kotlin.coroutines.Continuation
7+
import kotlin.coroutines.ContinuationInterceptor
8+
import kotlin.coroutines.CoroutineContext
9+
import kotlin.test.Test
10+
11+
/*
12+
* This is an adapted verion of test from #4296.
13+
*
14+
* qwwdfsad: the test relies on System.gc() actually collecting the garbage.
15+
* If these tests flake on CI, first check that JDK/GC setup in not an issue.
16+
*/
17+
class ThreadLocalCustomContinuationInterceptorTest : TestBase() {
18+
19+
private class CustomContinuationInterceptor(private val delegate: ContinuationInterceptor) :
20+
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
21+
22+
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
23+
return delegate.interceptContinuation(continuation)
24+
}
25+
}
26+
27+
private class CustomNeverEqualContinuationInterceptor(private val delegate: ContinuationInterceptor) :
28+
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
29+
30+
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
31+
return delegate.interceptContinuation(continuation)
32+
}
33+
34+
override fun equals(other: Any?) = false
35+
}
36+
37+
@Test(timeout = 20_000L)
38+
fun testDefaultDispatcherNoSuspension() = ensureCoroutineContextGCed(Dispatchers.Default, suspend = false)
39+
40+
@Test(timeout = 20_000L)
41+
fun testDefaultDispatcher() = ensureCoroutineContextGCed(Dispatchers.Default, suspend = true)
42+
43+
44+
@Test(timeout = 20_000L)
45+
fun testNonCoroutineDispatcher() = ensureCoroutineContextGCed(
46+
CustomContinuationInterceptor(Dispatchers.Default),
47+
suspend = true
48+
)
49+
50+
@Test(timeout = 20_000L)
51+
fun testNonCoroutineDispatcherSuspension() = ensureCoroutineContextGCed(
52+
CustomContinuationInterceptor(Dispatchers.Default),
53+
suspend = false
54+
)
55+
56+
// Note asymmetric equals codepath never goes through the undispatched withContext, thus the separate test case
57+
58+
@Test(timeout = 20_000L)
59+
fun testNonCoroutineDispatcherAsymmetricEquals() =
60+
ensureCoroutineContextGCed(
61+
CustomNeverEqualContinuationInterceptor(Dispatchers.Default),
62+
suspend = true
63+
)
64+
65+
@Test(timeout = 20_000L)
66+
fun testNonCoroutineDispatcherAsymmetricEqualsSuspension() =
67+
ensureCoroutineContextGCed(
68+
CustomNeverEqualContinuationInterceptor(Dispatchers.Default),
69+
suspend = false
70+
)
71+
72+
73+
@Volatile
74+
private var letThatSinkIn: Any = "What is my purpose? To frag the garbage collctor"
75+
76+
private fun ensureCoroutineContextGCed(coroutineContext: CoroutineContext, suspend: Boolean) {
77+
fun forceGcUntilRefIsCleaned(ref: WeakReference<CoroutineName>) {
78+
while (ref.get() != null) {
79+
System.gc()
80+
letThatSinkIn = LongArray(1024 * 1024)
81+
}
82+
}
83+
84+
runTest {
85+
lateinit var ref: WeakReference<CoroutineName>
86+
val job = GlobalScope.launch(coroutineContext) {
87+
val coroutineName = CoroutineName("Yo")
88+
ref = WeakReference(coroutineName)
89+
withContext(coroutineName) {
90+
if (suspend) {
91+
delay(1)
92+
}
93+
}
94+
}
95+
job.join()
96+
97+
forceGcUntilRefIsCleaned(ref)
98+
}
99+
}
100+
}

0 commit comments

Comments
 (0)