Skip to content

Commit b8dd10b

Browse files
committed
feat(plugin): allow to configure cache.max.size.capacity for InMemoryFileObjectStateBackingStore
1 parent 4f892a8 commit b8dd10b

File tree

4 files changed

+116
-39
lines changed

4 files changed

+116
-39
lines changed

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/InMemoryFileObjectStateBackingStore.java

Lines changed: 77 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
2323
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
2424
import io.streamthoughts.kafka.connect.filepulse.storage.StateSnapshot;
25+
import org.apache.kafka.common.config.AbstractConfig;
26+
import org.apache.kafka.common.config.ConfigDef;
2527
import org.slf4j.Logger;
2628
import org.slf4j.LoggerFactory;
2729

@@ -30,6 +32,7 @@
3032
import java.util.Map;
3133
import java.util.concurrent.TimeUnit;
3234
import java.util.concurrent.atomic.AtomicBoolean;
35+
import java.util.function.Consumer;
3336

3437
/**
3538
* An in-memory {@link StateBackingStore} implementation that uses an LRU cache based on HashMap.
@@ -40,22 +43,45 @@ public class InMemoryFileObjectStateBackingStore implements FileObjectStateBacki
4043

4144
private static final int DEFAULT_MAX_SIZE_CAPACITY = 10_000;
4245

43-
private final Map<String, FileObject> objects;
46+
private volatile Map<String, FileObject> objects;
4447

4548
private StateBackingStore.UpdateListener<FileObject> listener;
4649

4750
private final AtomicBoolean started = new AtomicBoolean(false);
4851

49-
public InMemoryFileObjectStateBackingStore() {
50-
this.objects = Collections.synchronizedMap(createLRUCache(DEFAULT_MAX_SIZE_CAPACITY));
51-
}
52+
/**
53+
* Creates a new {@link InMemoryFileObjectStateBackingStore} instance.
54+
*/
55+
public InMemoryFileObjectStateBackingStore() { }
5256

5357
@VisibleForTesting
5458
public InMemoryFileObjectStateBackingStore(final Map<String, FileObject> objects) {
55-
this();
59+
configure(Collections.emptyMap());
5660
this.objects.putAll(objects);
5761
}
5862

63+
/**
64+
* {@inheritDoc}
65+
*/
66+
@Override
67+
public void configure(final Map<String, ?> configs) {
68+
FileObjectStateBackingStore.super.configure(configs);
69+
int cacheMaxCapacity = new Config(configs).getCacheMaxCapacity();
70+
this.objects = Collections.synchronizedMap(createLRUCache(cacheMaxCapacity, objectEntry -> {
71+
if (!objectEntry.getValue().status().isDone()) {
72+
LOG.warn(
73+
"Evicting a file-object state '{}' from in-memory state with a non terminal"
74+
+ " status (i.e. 'CLEANED'). This may happen if you are processing more files than the"
75+
+ " max-capacity of the InMemoryFileObjectStateBackingStore before committing offsets"
76+
+ " for tasks successfully. Please consider increasing the value of"
77+
+ " 'tasks.file.status.storage.cache.max.size.capacity' through"
78+
+ " the connector's configuration.",
79+
objectEntry.getValue().metadata().stringURI()
80+
);
81+
}
82+
}));
83+
}
84+
5985
/**
6086
* {@inheritDoc}
6187
*/
@@ -156,12 +182,56 @@ public UpdateListener<FileObject> getListener() {
156182
return listener;
157183
}
158184

159-
private static <K, V> Map<K, V> createLRUCache(final int maxCacheSize) {
185+
private static <K, V> Map<K, V> createLRUCache(final int maxCacheSize,
186+
final Consumer<Map.Entry<K, V>> callbackOnRemoveEldest) {
160187
return new LinkedHashMap<>(maxCacheSize + 1, 1.01f, true) {
161188
@Override
162189
protected boolean removeEldestEntry(final Map.Entry<K, V> eldest) {
163-
return size() > maxCacheSize;
190+
boolean remove = size() > maxCacheSize;
191+
if (remove) {
192+
callbackOnRemoveEldest.accept(eldest);
193+
}
194+
return remove;
164195
}
165196
};
166197
}
198+
199+
private static final class Config extends AbstractConfig {
200+
201+
private static final String GROUP = "InMemoryFileObjectStateBackingStore";
202+
203+
public static final String TASKS_FILE_STATUS_STORAGE_CACHE_MAX_SIZE_CAPACITY_CONFIG
204+
= "tasks.file.status.storage.cache.max.size.capacity";
205+
private static final String TASKS_FILE_STATUS_STORAGE_CACHE_MAX_SIZE_CAPACITY_DOC
206+
= "The max size capacity of the LRU in-memory cache (default: 10_000).";
207+
208+
/**
209+
* Creates a new {@link Config} instance.
210+
*
211+
* @param originals the configuration properties.
212+
*/
213+
public Config(final Map<?, ?> originals) {
214+
super(configDef(), originals, false);
215+
}
216+
217+
public int getCacheMaxCapacity() {
218+
return this.getInt(TASKS_FILE_STATUS_STORAGE_CACHE_MAX_SIZE_CAPACITY_CONFIG);
219+
}
220+
221+
private static ConfigDef configDef() {
222+
int groupCounter = 0;
223+
return new ConfigDef()
224+
.define(
225+
TASKS_FILE_STATUS_STORAGE_CACHE_MAX_SIZE_CAPACITY_CONFIG,
226+
ConfigDef.Type.INT,
227+
DEFAULT_MAX_SIZE_CAPACITY,
228+
ConfigDef.Importance.LOW,
229+
TASKS_FILE_STATUS_STORAGE_CACHE_MAX_SIZE_CAPACITY_DOC,
230+
GROUP,
231+
groupCounter++,
232+
ConfigDef.Width.NONE,
233+
TASKS_FILE_STATUS_STORAGE_CACHE_MAX_SIZE_CAPACITY_CONFIG
234+
);
235+
}
236+
}
167237
}

docs/content/en/docs/Developer Guide/configuration.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
date: 2022-03-01
2+
date: 2022-03-02
33
title: "Configuration"
44
linkTitle: "Configuration"
55
weight: 20
@@ -79,6 +79,13 @@ The `InMemoryFileObjectStateBackingStore` implement is not fault-tolerant and sh
7979
| `tasks.file.status.storage.topic.partitions` | The number of partitions to be used for the status storage topic. | int | *-* | LOW |
8080
| `tasks.file.status.storage.topic.replication.factor` | The replication factor to be used for the status storage topic. | float | *-* | LOW |
8181

82+
83+
**Properties for configuring the `InMemoryFileObjectStateBackingStore` class**
84+
85+
| Configuration | Description | Type | Default | Importance | Since |
86+
|-----------------------------------------------------|-------------------------------------------------------------|-------|---------|------------|--------|
87+
| `tasks.file.status.storage.cache.max.size.capacity` | Specifies the max size capacity of the LRU in-memory cache. | `int` | *10000* | LOW | v2.5.0 |
88+
8289
In addition, to override the default configuration for the internal consumer and producer clients,
8390
you can use one of the following override prefixes :
8491

docs/content/en/docs/Developer Guide/filters-chain-definition.md

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -40,27 +40,27 @@ filters.ParseLog4jLog.overwrite=logmessage
4040

4141
These filters are available for use with Kafka Connect File Pulse:
4242

43-
| Filter | Description | Since
44-
|--- | --- | --- |
45-
| [AppendFilter](../filters#appendfilter) | Appends one or more values to an existing or non-existing array field | |
46-
| [ConvertFilter](../filters#convertfilter) | Converts a message field's value to a specific type | |
47-
| [DateFilter](../filters#datefilter) | Converts a field's value containing a date to a unix epoch time | |
48-
| [DelimitedRowFilter](../filters#delimitedrowfilter) | Parses a message field's value containing columns delimited by a separator into a struct | |
49-
| [DropFilter](../filters#dropfilter) | Drops messages satisfying a specific condition without throwing exception. | |
50-
| [ExcludeFilter](../filters#excludefilter) | Excludes one or more fields from the input record. | `v1.4.0` |
51-
| [ExplodeFilter](../filters#explodefilter) | Explodes an array or list field into separate records. | `v1.4.0` |
52-
| [FailFilter](../filters#failfilter) | Throws an exception when a message satisfy a specific condition | |
53-
| [GrokFilter](../filters#grokfilter) | Parses an unstructured message field's value to a struct by combining Grok patterns | |
54-
| [GroupRowFilter](../filters#grouprowfilter) | Regroups multiple following messages into a single message by composing a grouping key| |
55-
| [JoinFilter](../filters#joinfilter) | Joins values of an array field with a specified separator | |
56-
| [JSONFilter](../filters#jsonfilter) | Unmarshallings a JSON message field's value to a complex struct | |
57-
| [MoveFilter](../filters#movefilter) | Moves an existing record field's value to a specific target path | `v1.5.0` |
58-
| [MultiRowFilter](../filters#multirowfilter) | Combines following message lines into single one by combining patterns | |
59-
| [NullValueFilter](../filters#nullvaluefilter) | Combines following message lines into single one by combining patterns | `v2.3.0` |
60-
| [RenameFilter](../filters#renamefilter) | Renames a message field | |
61-
| [SplitFilter](../filters#splitfilter) | Splits a message field's value to array | |
62-
| [XmlToJsonFilter](../filters#xmltojsonfilter) | Parses an XML record-field and convert it to a JSON string | `v2.4.0` |
63-
| [XmlToStructFilter](../filters#xmltostructfilter) | Parses an XML record-field into STRUCT | `v2.4.0` |
43+
| Filter | Description | Since |
44+
|-----------------------------------------------------|------------------------------------------------------------------------------------------|----------|
45+
| [AppendFilter](../filters#appendfilter) | Appends one or more values to an existing or non-existing array field | |
46+
| [ConvertFilter](../filters#convertfilter) | Converts a message field's value to a specific type | |
47+
| [DateFilter](../filters#datefilter) | Converts a field's value containing a date to a unix epoch time | |
48+
| [DelimitedRowFilter](../filters#delimitedrowfilter) | Parses a message field's value containing columns delimited by a separator into a struct | |
49+
| [DropFilter](../filters#dropfilter) | Drops messages satisfying a specific condition without throwing exception. | |
50+
| [ExcludeFilter](../filters#excludefilter) | Excludes one or more fields from the input record. | `v1.4.0` |
51+
| [ExplodeFilter](../filters#explodefilter) | Explodes an array or list field into separate records. | `v1.4.0` |
52+
| [FailFilter](../filters#failfilter) | Throws an exception when a message satisfy a specific condition | |
53+
| [GrokFilter](../filters#grokfilter) | Parses an unstructured message field's value to a struct by combining Grok patterns | |
54+
| [GroupRowFilter](../filters#grouprowfilter) | Regroups multiple following messages into a single message by composing a grouping key | |
55+
| [JoinFilter](../filters#joinfilter) | Joins values of an array field with a specified separator | |
56+
| [JSONFilter](../filters#jsonfilter) | Unmarshallings a JSON message field's value to a complex struct | |
57+
| [MoveFilter](../filters#movefilter) | Moves an existing record field's value to a specific target path | `v1.5.0` |
58+
| [MultiRowFilter](../filters#multirowfilter) | Combines following message lines into single one by combining patterns | |
59+
| [NullValueFilter](../filters#nullvaluefilter) | Combines following message lines into single one by combining patterns | `v2.3.0` |
60+
| [RenameFilter](../filters#renamefilter) | Renames a message field | |
61+
| [SplitFilter](../filters#splitfilter) | Splits a message field's value to array | |
62+
| [XmlToJsonFilter](../filters#xmltojsonfilter) | Parses an XML record-field and convert it to a JSON string | `v2.4.0` |
63+
| [XmlToStructFilter](../filters#xmltostructfilter) | Parses an XML record-field into STRUCT | `v2.4.0` |
6464

6565
## Difference between Kafka Connect Single Message Transforms (SMT) functionality
6666

docs/content/en/docs/Developer Guide/handling-failures.md

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ But, you can also configure each filter to either ignore errors or to branch to
1414

1515
## Configuration
1616

17-
| Configuration | Description | Type | Default | Importance |
18-
| --------------| --------------|-----------| --------- | ------------- |
19-
| `withOnFailure` | List of filters aliases to apply on each data after failure (order is important). | list | *-* | medium |
20-
| `ignoreFailure` | Ignore failure and continue pipeline filters | boolean | *false* | medium |
17+
| Configuration | Description | Type | Default | Importance |
18+
|-----------------|-----------------------------------------------------------------------------------|---------|---------|------------|
19+
| `withOnFailure` | List of filters aliases to apply on each data after failure (order is important). | list | *-* | medium |
20+
| `ignoreFailure` | Ignore failure and continue pipeline filters | boolean | *false* | medium |
2121

2222

2323
## Ignoring failure
@@ -51,12 +51,12 @@ Sub-filter chains can be defined using the property `withOnFailure`.
5151

5252
Within an error filter chain, some additional fields are available to each filter context.
5353

54-
| Predefined Fields / ScEL | Description | Type |
55-
|--- | --- |--- |
56-
| `$error.exceptionMessage` | The exception message | `string` |
57-
| `$error.exceptionStacktrace` | The exception stack-trace | `string` |
58-
| `$error.exceptionClassName` | The exception class name | `string` |
59-
| `$error.filter` | The name of the filter that threw the exception | `string` |
54+
| Predefined Fields / ScEL | Description | Type |
55+
|------------------------------|-------------------------------------------------|----------|
56+
| `$error.exceptionMessage` | The exception message | `string` |
57+
| `$error.exceptionStacktrace` | The exception stack-trace | `string` |
58+
| `$error.exceptionClassName` | The exception class name | `string` |
59+
| `$error.filter` | The name of the filter that threw the exception | `string` |
6060

6161
### Example
6262

0 commit comments

Comments
 (0)