|
| 1 | +--- |
| 2 | +title: "Connect FilePulse 2.4 is Released 🚀" |
| 3 | +linkTitle: "Connect FilePulse 2.4 is Released" |
| 4 | +date: 2021-10-04 |
| 5 | +description: "" |
| 6 | +author: Florian Hussonnois ([@fhussonnois](https://twitter.com/fhussonnois)) |
| 7 | +--- |
| 8 | + |
| 9 | +I am pleased to announce the release of Connect FilePulse 2.4. This release brings new built-in expression functions, processing filters as well as some minor improvements and bug fixes. |
| 10 | + |
| 11 | +## Simple Connect Expression Language |
| 12 | + |
| 13 | +This release packs with new built-in functions to enrich the powerful expression language provided by connect FilePulse: |
| 14 | + |
| 15 | +**Boolean Functions** |
| 16 | + |
| 17 | +| Function | Description | |
| 18 | +| -----------------|-------------| |
| 19 | +| `and` | Checks if all of the given conditional expressions are `true`. | |
| 20 | +| `gt` | Executes "*greater than operation*" on two values and returns `true` if the first value is greater than the second value, `false`, otherwise. | |
| 21 | +| `if` | Evaluates the given boolean expression and returns one value if `true` and another value if `false`. | |
| 22 | +| `lt` | Executes "*less than operation*" on two values and returns `true` if the first value is less than the second value, `false`, otherwise. | |
| 23 | +| `not` | Reverses a boolean value | |
| 24 | +| `or` | Checks if at least one of the given conditional expressions is `true`. | |
| 25 | + |
| 26 | +**Date and time Functions** |
| 27 | + |
| 28 | +| Function | Description | |
| 29 | +| -----------------|-------------| |
| 30 | +| `timestamp_diff` | Calculates the amount of time between two epoch times in seconds or milliseconds. For more information on `unit` see [ChronoUnit](https://docs.oracle.com/javase/8/docs/api/java/time/temporal/ChronoUnit.html). | |
| 31 | +| `to_timestamp` | Parses a given string value and returns the epoch-time in milliseconds. | |
| 32 | +| `unix_timestamp` | Returns the current time in milliseconds. | |
| 33 | + |
| 34 | +These new functions allows for specifying more complex conditions on configured [*Processing Filters*](https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/filters/). |
| 35 | +For example, we can use them to remove all records retrieved from files that are older than 24 hours. |
| 36 | + |
| 37 | +```properties |
| 38 | +filters=DropTooLateFiles |
| 39 | +filters.DropTooLateFiles.type=io.streamthoughts.kafka.connect.filepulse.filter.DropFilter |
| 40 | +filters.DropTooLateFiles.if="{{ gt(timestamp_diff('HOURS', $metadata.lastModified, unix_timestamp()), 24) }}" |
| 41 | +filters.DropTooLateFiles.invert=false |
| 42 | +``` |
| 43 | + |
| 44 | +# XML Processing |
| 45 | + |
| 46 | +XML is still widely used in legacy systems. To ease the integration of this data in Kafka, |
| 47 | +this release introduces two new *Processing Filters*: `XmlToStructFilter` and `XmlToJsonFilter` which can be used in addition to the existing `XMLFileInputReader`. |
| 48 | + |
| 49 | +**XmlToStructFilter** |
| 50 | + |
| 51 | +This _processing filter_ can be used to parse and convert an XML file that you read, for example, using the `LocalBytesArrayInputReader` into a `Struct` record. |
| 52 | +This filter should be preferred to the `XMLFileInputReader` when you need to deal with invalid XML files. |
| 53 | + |
| 54 | +For example, you may want to send invalid XML files into specific _Dead Letter Topic_. |
| 55 | + |
| 56 | +```properties |
| 57 | +filters=ParseXmlDocument |
| 58 | +filters.ParseXmlDocument.type=io.streamthoughts.kafka.connect.filepulse.filter.XmlToStructFilter |
| 59 | +filters.ParseXmlDocument.source=message |
| 60 | +filters.ParseXmlDocument.xml.parser.validating.enabled=true |
| 61 | +filters.ParseXmlDocument.xml.parser.namespace.aware.enabled=true |
| 62 | +filters.ParseXmlDocument.xml.exclude.empty.elements=true |
| 63 | +filters.ParseXmlDocument.xml.exclude.node.attributes=false |
| 64 | +filters.ParseXmlDocument.xml.data.type.inference.enabled=true |
| 65 | +filters.ParseXmlDocument.withOnFailure=SetToErrorTopic |
| 66 | +filters.SetToErrorTopic.type=io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter |
| 67 | +filters.SetToErrorTopic.value=xml-parsing-DLQ |
| 68 | +filters.SetToErrorTopic.field=$topic |
| 69 | +``` |
| 70 | + |
| 71 | +**XmlToJsonFilter** |
| 72 | + |
| 73 | +This _processing filter_ can be used to parse and convert an XML file that you read, for example, using the `LocalBytesArrayInputReader` into a JSON string record. |
| 74 | + |
| 75 | +# Exception Context |
| 76 | + |
| 77 | +When an exception occurs in the _processing filter chain_, Connect FilePulse allows you to access the context of the exception using the `$error` scope from the expression language. |
| 78 | +In previous versions, only the exception message was available (e.g., using `$error.message`). Now, you can retrieve the exception stacktrace as well as the exception class name using: |
| 79 | + |
| 80 | +* `$error.exceptionMessage` |
| 81 | +* `$error.exceptionStacktrace` |
| 82 | +* `$error.exceptionClassName` |
| 83 | + |
| 84 | +The below examples shows how to add the exception information to teh record headers. |
| 85 | + |
| 86 | +```properties |
| 87 | +... |
| 88 | +filters.ParseXmlDocument.withOnFailure=SetToErrorTopic,AppendErrorMessageToHeader,AppendErrorStacktraceToHeader,AppendErrorClassNameToHeader |
| 89 | +filters.AppendErrorMessageToHeader.type=io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter |
| 90 | +filters.AppendErrorMessageToHeader.field=$headers.errors.exception.message |
| 91 | +filters.AppendErrorMessageToHeader.value={{ $error.exceptionMessage }} |
| 92 | +filters.AppendErrorStacktraceToHeader.type=io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter |
| 93 | +filters.AppendErrorStacktraceToHeader.field=$headers.errors.exception.stacktrace |
| 94 | +filters.AppendErrorStacktraceToHeader.value={{ $error.exceptionStacktrace }} |
| 95 | +filters.AppendErrorClassNameToHeader.type=io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter |
| 96 | +filters.AppendErrorClassNameToHeader.field=$headers.errors.exception.message |
| 97 | +filters.AppendErrorClassNameToHeader.value={{ $error.exceptionClassName }} |
| 98 | +``` |
| 99 | + |
| 100 | +## Full Release Notes |
| 101 | + |
| 102 | +Connect File Pulse 2.4 can be downloaded from the [GitHub Releases Page](https://github.com/streamthoughts/kafka-connect-file-pulse/releases/tag/v2.4.0). |
| 103 | + |
| 104 | +### Features |
| 105 | +[67683d5](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/67683d5) feat(expression): add built-in SCeL expression function NOT |
| 106 | +[7fea775](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/7fea775) feat(dataformat): add config to specify a prefix used to prepend XML attributes (#176) |
| 107 | +[4fc2cb9](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/4fc2cb9) feat(expression): add expression function TimestampDiff |
| 108 | +[9d72e47](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/9d72e47) feat(expression): add expression function ToTimestamp |
| 109 | +[0644cb9](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/0644cb9) feat(expressions): add built-in function 'gt' and 'lt' to ScEL |
| 110 | +[e4375c8](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/e4375c8) feat(expressions): add built-in function 'or' and 'and' to ScEL |
| 111 | +[28a6126](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/28a6126) feat(expressions): add built-in function 'if' to ScEL |
| 112 | +[4fe77fd](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/4fe77fd) feat(api): add access to error stacktrace in filter chain |
| 113 | +[b9c0a40](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/b9c0a40) feat(dataformat): add new config prop to exclude node attributes in namespaces (#175) |
| 114 | +[8f648c8](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/8f648c8) feat(dataformat): add new config props to exclude all XML attributes (#174) |
| 115 | +[355b6e4](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/355b6e4) feat(expression): add UnixTimestamp expression function |
| 116 | +[5a62f03](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/5a62f03) feat(filters): add new XmlToStructFilter |
| 117 | +[9cad2fa](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/9cad2fa) feat(filters): add new simple XmlToJsonFilter |
| 118 | +[0e29ce2](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/0e29ce2) feat(plugin): add capability to merge schemas deriving from records |
| 119 | + |
| 120 | +### Improvements & Bugfixes |
| 121 | +[165a908](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/165a908) refactor(expressions): allow functions to not evaluate all expression args |
| 122 | +[4e9f84d](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/4e9f84d) fix(expressions): fix equals SCeL expression should support null argument (#187) |
| 123 | +[7bdc787](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/7bdc787) fix(filesystems): fix regression on AmazonS3Client configuration (#184) |
| 124 | +[d76bac0](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/d76bac0) fix(plugin): refactor InMemoryFileObjectStateBackingStore to use an LRU cache (#183) |
| 125 | +[50200f7](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/50200f7) fix(plugin): fix resources must not be closed while files are not committed |
| 126 | +[17e9efb](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/17e9efb) fix(plugin): fix regression cleanup object files should not be rescheduled (#178) |
| 127 | +[e2f74b2](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/e2f74b2) fix(api): fix schemas should be merged per target topic |
| 128 | +[03bab9a](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/03bab9a) fix(api): enhance mapping to connect schema to handle duplicate schema |
| 129 | +[760d98b](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/760d98b) fix(filters): XmlToJson should support bytes input |
| 130 | +[99c374f](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/99c374f) fix(api): fix schema behavior on array merge |
| 131 | + |
| 132 | +## Sub-Tasks |
| 133 | +[e9cd483](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/e9cd483) fix(build): normalize artefact-ids |
| 134 | +[2b8d260](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/2b8d260) refactor(filters): relocate json packages |
| 135 | +[4d13731](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/4d13731) refactor(filters): cleanup classes |
| 136 | +[f3179a7](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/f3179a7) refactor(expression): refactor expression function api |
| 137 | +[bf3fc31](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/bf3fc31) refactor(expression): reorganize packages for built-in functions |
| 138 | + |
| 139 | +### Docs |
| 140 | +[643469f](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/643469f) site(docs): update documentations |
| 141 | +[be29aae](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/be29aae) docs(site): add new function descriptions |
| 142 | +[2a9a119](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/2a9a119) docs(site): fix missing config property |
| 143 | +[7533d2f](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/7533d2f) docs(site): improve installation guide |
| 144 | +[71a9ebe](https://github.com/streamthoughts/kafka-connect-file-pulse/commit/71a9ebe) docs(site): add doc for defining schema |
| 145 | + |
| 146 | +If you enjoyed reading this post, check out Connect FilePulse at GitHub and give us a ⭐! |
0 commit comments