Skip to content

Commit 8484877

Browse files
committed
feat(filters): add new built-in CSVFilter (#249)
This commit adds a new filter based-on opencsv library for processing CSV values. Resolves: #249
1 parent 2d37f89 commit 8484877

File tree

3 files changed

+414
-0
lines changed

3 files changed

+414
-0
lines changed

connect-file-pulse-filters/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@
6363
<artifactId>connect-api</artifactId>
6464
</dependency>
6565

66+
<dependency>
67+
<groupId>com.opencsv</groupId>
68+
<artifactId>opencsv</artifactId>
69+
<version>5.6</version>
70+
</dependency>
71+
6672
<dependency>
6773
<groupId>io.streamthoughts</groupId>
6874
<artifactId>kafka-connect-filepulse-api</artifactId>
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
* Copyright 2022 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.filter;
20+
21+
import com.opencsv.CSVParser;
22+
import com.opencsv.CSVParserBuilder;
23+
import com.opencsv.ICSVParser;
24+
import org.apache.kafka.common.config.ConfigDef;
25+
import org.apache.kafka.common.config.ConfigException;
26+
27+
import java.io.IOException;
28+
import java.util.Map;
29+
30+
public class CSVFilter extends AbstractDelimitedRowFilter<CSVFilter> {
31+
32+
public final static String PARSER_SEPARATOR_CONFIG = "separator";
33+
private final static String PARSER_SEPARATOR_DOC = "Sets the delimiter to use for separating entries.";
34+
35+
public final static String PARSER_IGNORE_QUOTATIONS_CONFIG = "ignore.quotations";
36+
private final static String PARSER_IGNORE_QUOTATIONS_DOC =
37+
"Sets the ignore quotations mode - if true, quotations are ignored.";
38+
39+
public final static String PARSER_ESCAPE_CHAR_CONFIG = "escape.char";
40+
private final static String PARSER_ESCAPE_CHAR_DOC =
41+
"Sets the character to use for escaping a separator or quote.";
42+
43+
public final static String PARSER_IGNORE_LEADING_WHITESPACE_CONFIG = "ignore.leading.whitespace";
44+
private final static String PARSER_IGNORE_LEADING_WHITESPACE_DOC =
45+
"Sets the ignore leading whitespace setting - if true, white space in "
46+
+ "front of a quote in a field is ignored.";
47+
48+
public final static String PARSER_QUOTE_CHAR_CONFIG = "quote.char";
49+
private final static String PARSER_QUOTE_CHAR_DOC = "Sets the character to use for quoted elements.";
50+
51+
public final static String PARSER_STRICT_QUOTES_CHAR_CONFIG = "strict.quotes";
52+
private final static String PARSER_STRICT_QUOTES_CHAR_DOC =
53+
"Sets the strict quotes setting - if true, characters outside the quotes are ignored.";
54+
private static final String CONFIG_GROUP = "CSV Filter";
55+
56+
private CSVParser parser;
57+
58+
/**
59+
* {@inheritDoc}
60+
*/
61+
@Override
62+
public void configure(final Map<String, ?> configs) {
63+
super.configure(configs);
64+
this.parser = new CSVParserBuilder()
65+
.withSeparator(filterConfig().getString(PARSER_SEPARATOR_CONFIG).charAt(0))
66+
.withIgnoreQuotations(filterConfig().getBoolean(PARSER_IGNORE_QUOTATIONS_CONFIG))
67+
.withEscapeChar(filterConfig().getString(PARSER_ESCAPE_CHAR_CONFIG).charAt(0))
68+
.withIgnoreLeadingWhiteSpace(filterConfig().getBoolean(PARSER_IGNORE_LEADING_WHITESPACE_CONFIG))
69+
.withQuoteChar(filterConfig().getString(PARSER_QUOTE_CHAR_CONFIG).charAt(0))
70+
.withStrictQuotes(filterConfig().getBoolean(PARSER_STRICT_QUOTES_CHAR_CONFIG))
71+
.build();
72+
}
73+
74+
/**
75+
* {@inheritDoc}
76+
*/
77+
@Override
78+
protected String[] parseColumnsValues(final String line) {
79+
try {
80+
return parser.parseLine(line);
81+
} catch (IOException e) {
82+
throw new FilterException("Failed to parse CSV line", e);
83+
}
84+
}
85+
86+
/**
87+
* {@inheritDoc}
88+
*/
89+
@Override
90+
public ConfigDef configDef() {
91+
int filterGroupCounter = 0;
92+
return super.configDef()
93+
.define(
94+
PARSER_SEPARATOR_CONFIG,
95+
ConfigDef.Type.STRING,
96+
String.valueOf(ICSVParser.DEFAULT_SEPARATOR),
97+
new NonEmptyCharacter(),
98+
ConfigDef.Importance.HIGH,
99+
PARSER_SEPARATOR_DOC,
100+
CONFIG_GROUP,
101+
filterGroupCounter++,
102+
ConfigDef.Width.NONE,
103+
PARSER_SEPARATOR_CONFIG
104+
)
105+
.define(
106+
PARSER_IGNORE_QUOTATIONS_CONFIG,
107+
ConfigDef.Type.BOOLEAN,
108+
ICSVParser.DEFAULT_IGNORE_QUOTATIONS,
109+
ConfigDef.Importance.MEDIUM,
110+
PARSER_IGNORE_QUOTATIONS_DOC,
111+
CONFIG_GROUP,
112+
filterGroupCounter++,
113+
ConfigDef.Width.NONE,
114+
PARSER_IGNORE_QUOTATIONS_CONFIG
115+
)
116+
.define(
117+
PARSER_ESCAPE_CHAR_CONFIG,
118+
ConfigDef.Type.STRING,
119+
String.valueOf(ICSVParser.DEFAULT_ESCAPE_CHARACTER),
120+
new NonEmptyCharacter(),
121+
ConfigDef.Importance.MEDIUM,
122+
PARSER_ESCAPE_CHAR_DOC,
123+
CONFIG_GROUP,
124+
filterGroupCounter++,
125+
ConfigDef.Width.NONE,
126+
PARSER_ESCAPE_CHAR_CONFIG
127+
)
128+
.define(
129+
PARSER_IGNORE_LEADING_WHITESPACE_CONFIG,
130+
ConfigDef.Type.BOOLEAN,
131+
ICSVParser.DEFAULT_IGNORE_LEADING_WHITESPACE,
132+
ConfigDef.Importance.MEDIUM,
133+
PARSER_IGNORE_LEADING_WHITESPACE_DOC,
134+
CONFIG_GROUP,
135+
filterGroupCounter++,
136+
ConfigDef.Width.NONE,
137+
PARSER_IGNORE_LEADING_WHITESPACE_CONFIG
138+
)
139+
.define(
140+
PARSER_QUOTE_CHAR_CONFIG,
141+
ConfigDef.Type.STRING,
142+
String.valueOf(ICSVParser.DEFAULT_QUOTE_CHARACTER),
143+
new NonEmptyCharacter(),
144+
ConfigDef.Importance.MEDIUM,
145+
PARSER_QUOTE_CHAR_DOC,
146+
CONFIG_GROUP,
147+
filterGroupCounter++,
148+
ConfigDef.Width.NONE,
149+
PARSER_QUOTE_CHAR_CONFIG
150+
)
151+
.define(
152+
PARSER_STRICT_QUOTES_CHAR_CONFIG,
153+
ConfigDef.Type.BOOLEAN,
154+
ICSVParser.DEFAULT_STRICT_QUOTES,
155+
ConfigDef.Importance.MEDIUM,
156+
PARSER_STRICT_QUOTES_CHAR_DOC,
157+
CONFIG_GROUP,
158+
filterGroupCounter++,
159+
ConfigDef.Width.NONE,
160+
PARSER_STRICT_QUOTES_CHAR_CONFIG
161+
);
162+
}
163+
164+
165+
public static class NonEmptyCharacter implements ConfigDef.Validator {
166+
public NonEmptyCharacter() {}
167+
168+
@Override
169+
public void ensureValid(final String name, final Object o) {
170+
String s = (String)o;
171+
if (s != null) {
172+
if (s.isEmpty()) {
173+
throw new ConfigException(name, o, "Character must be non-empty");
174+
}
175+
if (s.length() > 1) {
176+
throw new ConfigException(
177+
name,
178+
o,
179+
"Expected value to be a single character, but it was a string with a length superior to 1"
180+
);
181+
}
182+
}
183+
184+
}
185+
186+
public String toString() {
187+
return "non-empty character";
188+
}
189+
}
190+
}

0 commit comments

Comments
 (0)