Skip to content

Commit af80a09

Browse files
committed
fixes reflector list expiration
1 parent b8cf5e6 commit af80a09

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed

util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java

+28
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
import io.kubernetes.client.common.KubernetesObject;
1717
import io.kubernetes.client.informer.EventType;
1818
import io.kubernetes.client.informer.ListerWatcher;
19+
import io.kubernetes.client.openapi.ApiException;
1920
import io.kubernetes.client.openapi.models.V1ListMeta;
2021
import io.kubernetes.client.openapi.models.V1ObjectMeta;
2122
import io.kubernetes.client.util.CallGeneratorParams;
2223
import io.kubernetes.client.util.Strings;
2324
import io.kubernetes.client.util.Watchable;
2425
import java.io.IOException;
2526
import java.net.ConnectException;
27+
import java.net.HttpURLConnection;
2628
import java.time.Duration;
2729
import java.util.List;
2830
import java.util.concurrent.atomic.AtomicBoolean;
@@ -37,6 +39,9 @@ public class ReflectorRunnable<
3739
private static final Logger log = LoggerFactory.getLogger(ReflectorRunnable.class);
3840

3941
private String lastSyncResourceVersion;
42+
43+
private boolean isLastSyncResourceVersionUnavailable;
44+
4045
private Watchable<ApiType> watch;
4146

4247
private ListerWatcher<ApiType, ApiListType> listerWatcher;
@@ -86,6 +91,7 @@ public void run() {
8691
}
8792
this.syncWith(items, resourceVersion);
8893
this.lastSyncResourceVersion = resourceVersion;
94+
this.isLastSyncResourceVersionUnavailable = false;
8995

9096
if (log.isDebugEnabled()) {
9197
log.debug("{}#Start watching with {}...", apiTypeClass, lastSyncResourceVersion);
@@ -145,6 +151,13 @@ public void run() {
145151
closeWatch();
146152
}
147153
}
154+
} catch (ApiException e) {
155+
if (e.getCode() == HttpURLConnection.HTTP_GONE) {
156+
log.info(
157+
"ResourceVersion {} expired, will retry w/o resourceVersion at the next time",
158+
getRelistResourceVersion());
159+
isLastSyncResourceVersionUnavailable = true;
160+
}
148161
} catch (Throwable t) {
149162
this.exceptionHandler.accept(apiTypeClass, t);
150163
}
@@ -175,8 +188,23 @@ public String getLastSyncResourceVersion() {
175188
return lastSyncResourceVersion;
176189
}
177190

191+
public boolean isLastSyncResourceVersionUnavailable() {
192+
return isLastSyncResourceVersionUnavailable;
193+
}
194+
178195
private String getRelistResourceVersion() {
196+
if (isLastSyncResourceVersionUnavailable) {
197+
// Since this reflector makes paginated list requests, and all paginated list requests skip
198+
// the watch cache
199+
// if the lastSyncResourceVersion is unavailable, we set ResourceVersion="" and list again to
200+
// re-establish reflector
201+
// to the latest available ResourceVersion, using a consistent read from etcd.
202+
return "";
203+
}
179204
if (Strings.isNullOrEmpty(lastSyncResourceVersion)) {
205+
// For performance reasons, initial list performed by reflector uses "0" as resource version
206+
// to allow it to
207+
// be served from the watch cache if it is enabled.
180208
return "0";
181209
}
182210
return lastSyncResourceVersion;

util/src/test/java/io/kubernetes/client/informer/cache/ReflectorRunnableTest.java

+40
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
*/
1313
package io.kubernetes.client.informer.cache;
1414

15+
import static org.junit.Assert.assertFalse;
1516
import static org.mockito.Mockito.any;
1617
import static org.mockito.Mockito.never;
1718
import static org.mockito.Mockito.times;
@@ -28,6 +29,7 @@
2829
import io.kubernetes.client.util.CallGeneratorParams;
2930
import io.kubernetes.client.util.Watch;
3031
import io.kubernetes.client.util.Watchable;
32+
import java.net.HttpURLConnection;
3133
import java.time.Duration;
3234
import java.util.concurrent.atomic.AtomicReference;
3335
import org.awaitility.Awaitility;
@@ -215,4 +217,42 @@ public Watchable<V1Pod> watch(CallGeneratorParams params) throws ApiException {
215217
reflectorRunnable.stop();
216218
}
217219
}
220+
221+
@Test
222+
public void testReflectorListShouldHandleExpiredResourceVersion() throws ApiException {
223+
String expectedResourceVersion = "100";
224+
when(listerWatcher.list(any()))
225+
.thenReturn(
226+
new V1PodList().metadata(new V1ListMeta().resourceVersion(expectedResourceVersion)));
227+
// constantly failing watches will make the reflector run only one time
228+
when(listerWatcher.watch(any())).thenThrow(new ApiException(HttpURLConnection.HTTP_GONE, ""));
229+
ReflectorRunnable<V1Pod, V1PodList> reflectorRunnable =
230+
new ReflectorRunnable<>(V1Pod.class, listerWatcher, deltaFIFO);
231+
try {
232+
Thread thread = new Thread(reflectorRunnable::run);
233+
thread.setDaemon(true);
234+
thread.start();
235+
Awaitility.await()
236+
.atMost(Duration.ofSeconds(1))
237+
.pollInterval(Duration.ofMillis(100))
238+
.until(
239+
() -> expectedResourceVersion.equals(reflectorRunnable.getLastSyncResourceVersion()));
240+
assertFalse(reflectorRunnable.isLastSyncResourceVersionUnavailable());
241+
} finally {
242+
reflectorRunnable.stop();
243+
}
244+
245+
try {
246+
when(listerWatcher.list(any())).thenThrow(new ApiException(HttpURLConnection.HTTP_GONE, ""));
247+
Thread thread = new Thread(reflectorRunnable::run);
248+
thread.setDaemon(true);
249+
thread.start();
250+
Awaitility.await()
251+
.atMost(Duration.ofSeconds(5))
252+
.pollInterval(Duration.ofMillis(100))
253+
.until(() -> reflectorRunnable.isLastSyncResourceVersionUnavailable());
254+
} finally {
255+
reflectorRunnable.stop();
256+
}
257+
}
218258
}

0 commit comments

Comments
 (0)