Skip to content

Commit d76bac0

Browse files
committed
fix(plugin): refactor InMemoryFileObjectStateBackingStore to use an LRU cache (#183)
Resolves: #183
1 parent 7edf4d7 commit d76bac0

File tree

1 file changed

+18
-4
lines changed

1 file changed

+18
-4
lines changed

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/InMemoryFileObjectStateBackingStore.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,28 +26,33 @@
2626
import org.slf4j.LoggerFactory;
2727

2828
import java.util.Collections;
29+
import java.util.LinkedHashMap;
2930
import java.util.Map;
30-
import java.util.concurrent.ConcurrentHashMap;
3131
import java.util.concurrent.TimeUnit;
3232
import java.util.concurrent.atomic.AtomicBoolean;
3333

3434
/**
35-
* In-memory {@link StateBackingStore} implementation.
35+
* An in-memory {@link StateBackingStore} implementation that uses an LRU cache based on HashMap.
3636
*/
3737
public class InMemoryFileObjectStateBackingStore implements FileObjectStateBackingStore {
3838

3939
private static final Logger LOG = LoggerFactory.getLogger(InMemoryFileObjectStateBackingStore.class);
4040

41-
private final ConcurrentHashMap<String, FileObject> objects = new ConcurrentHashMap<>();
41+
private static final int DEFAULT_MAX_SIZE_CAPACITY = 10_000;
42+
43+
private final Map<String, FileObject> objects;
4244

4345
private StateBackingStore.UpdateListener<FileObject> listener;
4446

4547
private final AtomicBoolean started = new AtomicBoolean(false);
4648

47-
public InMemoryFileObjectStateBackingStore() { }
49+
public InMemoryFileObjectStateBackingStore() {
50+
this.objects = Collections.synchronizedMap(createLRUCache(DEFAULT_MAX_SIZE_CAPACITY));
51+
}
4852

4953
@VisibleForTesting
5054
public InMemoryFileObjectStateBackingStore(final Map<String, FileObject> objects) {
55+
this();
5156
this.objects.putAll(objects);
5257
}
5358

@@ -150,4 +155,13 @@ public void setUpdateListener(final UpdateListener<FileObject> listener) {
150155
public UpdateListener<FileObject> getListener() {
151156
return listener;
152157
}
158+
159+
private static <K, V> Map<K, V> createLRUCache(final int maxCacheSize) {
160+
return new LinkedHashMap<>(maxCacheSize + 1, 1.01f, true) {
161+
@Override
162+
protected boolean removeEldestEntry(final Map.Entry<K, V> eldest) {
163+
return size() > maxCacheSize;
164+
}
165+
};
166+
}
153167
}

0 commit comments

Comments
 (0)