Skip to content

Commit 9cad2fa

Browse files
committed
feat(filters): add new simple XmlToJsonFilter
1 parent 0e29ce2 commit 9cad2fa

File tree

4 files changed

+243
-0
lines changed

4 files changed

+243
-0
lines changed

connect-file-pulse-filters/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,12 @@
8181
<version>1.1.0</version>
8282
</dependency>
8383

84+
<dependency>
85+
<groupId>org.json</groupId>
86+
<artifactId>json</artifactId>
87+
<version>20210307</version>
88+
</dependency>
89+
8490
<dependency>
8591
<groupId>com.jsoniter</groupId>
8692
<artifactId>jsoniter</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.config;
20+
21+
import org.apache.kafka.common.config.ConfigDef;
22+
23+
import java.util.Map;
24+
25+
public class XmlToJsonFilterConfig extends CommonFilterConfig {
26+
27+
private static final String FILTER = "XmlToJsonFilter";
28+
29+
public static final String XML_PARSER_KEEP_STRINGS_CONFIG = "xml.parser.keep.strings";
30+
public static final String XML_PARSER_KEEP_STRINGS_DOC = "When parsing the XML into JSON, specifies if values should be kept " +
31+
"as strings (true), or if " +
32+
"they should try to be guessed into JSON values (numeric, boolean, string)";
33+
34+
public static final String XML_PARSER_CDATA_TAG_NAME_DEFAULT = "value";
35+
public static final String XML_PARSER_CDATA_TAG_NAME_CONFIG = "xml.parser.cDataTagName";
36+
public static final String XML_PARSER_CDATA_TAG_NAME_DOC = "The name of the key in a JSON Object that indicates " +
37+
"a CDATA section (default: '" + XML_PARSER_CDATA_TAG_NAME_DEFAULT + "').";
38+
39+
40+
/**
41+
* Creates a new {@link XmlToJsonFilterConfig} instance.
42+
*
43+
* @param originals the originals configuration.
44+
*/
45+
public XmlToJsonFilterConfig(final Map<?, ?> originals) {
46+
super(configDef(), originals);
47+
}
48+
49+
public static ConfigDef configDef() {
50+
int filterGroupCounter = 0;
51+
return new ConfigDef(CommonFilterConfig.configDef())
52+
.define(withOverwrite(FILTER, filterGroupCounter++))
53+
.define(withSource(FILTER, filterGroupCounter++))
54+
.define(
55+
XML_PARSER_KEEP_STRINGS_CONFIG,
56+
ConfigDef.Type.BOOLEAN,
57+
false,
58+
ConfigDef.Importance.HIGH,
59+
XML_PARSER_KEEP_STRINGS_DOC,
60+
FILTER,
61+
filterGroupCounter++,
62+
ConfigDef.Width.NONE,
63+
XML_PARSER_KEEP_STRINGS_CONFIG
64+
)
65+
.define(
66+
XML_PARSER_CDATA_TAG_NAME_CONFIG,
67+
ConfigDef.Type.STRING,
68+
XML_PARSER_CDATA_TAG_NAME_DEFAULT,
69+
ConfigDef.Importance.MEDIUM,
70+
XML_PARSER_CDATA_TAG_NAME_DOC,
71+
FILTER,
72+
filterGroupCounter++,
73+
ConfigDef.Width.NONE,
74+
XML_PARSER_CDATA_TAG_NAME_CONFIG
75+
);
76+
}
77+
78+
public boolean getXmlParserKeepStrings() {
79+
return getBoolean(XML_PARSER_KEEP_STRINGS_CONFIG);
80+
}
81+
82+
public String getCDataTagName() {
83+
return getString(XML_PARSER_CDATA_TAG_NAME_CONFIG);
84+
}
85+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.filter;
20+
21+
import io.streamthoughts.kafka.connect.filepulse.config.XmlToJsonFilterConfig;
22+
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
23+
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
24+
import io.streamthoughts.kafka.connect.filepulse.internal.StringUtils;
25+
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
26+
import org.apache.kafka.common.config.ConfigDef;
27+
import org.json.JSONException;
28+
import org.json.JSONObject;
29+
import org.json.XML;
30+
import org.json.XMLParserConfiguration;
31+
32+
import java.util.Collections;
33+
import java.util.Map;
34+
import java.util.Set;
35+
36+
public class XmlToJsonFilter extends AbstractMergeRecordFilter<XmlToJsonFilter> {
37+
38+
private XmlToJsonFilterConfig config;
39+
40+
private XMLParserConfiguration xmlParserConfiguration;
41+
/**
42+
* {@inheritDoc}
43+
*/
44+
@Override
45+
public ConfigDef configDef() {
46+
return XmlToJsonFilterConfig.configDef();
47+
}
48+
49+
/**
50+
* {@inheritDoc}
51+
*/
52+
@Override
53+
public void configure(final Map<String, ?> props) {
54+
super.configure(props);
55+
config = new XmlToJsonFilterConfig(props);
56+
57+
xmlParserConfiguration = new XMLParserConfiguration()
58+
.withKeepStrings(config.getXmlParserKeepStrings())
59+
.withcDataTagName(config.getCDataTagName());
60+
}
61+
62+
/**
63+
* {@inheritDoc}
64+
*/
65+
@Override
66+
protected RecordsIterable<TypedStruct> apply(final FilterContext context,
67+
final TypedStruct record) throws FilterException {
68+
try {
69+
final String payload = checkIsNotNull(record.get(config.source())).getString();
70+
71+
if (StringUtils.isBlank(payload)) {
72+
return RecordsIterable.empty();
73+
}
74+
75+
final JSONObject xmlJSONObj = XML.toJSONObject(payload, xmlParserConfiguration);
76+
final String jsonString = xmlJSONObj.toString(0);
77+
return RecordsIterable.of(TypedStruct.create().put(config.source(), jsonString));
78+
} catch (JSONException e) {
79+
throw new FilterException("Failed to parse and convert XML document into JSON object", e);
80+
}
81+
}
82+
83+
/**
84+
* {@inheritDoc}
85+
*/
86+
@Override
87+
protected Set<String> overwrite() {
88+
return Collections.singleton(config.source());
89+
}
90+
91+
private TypedValue checkIsNotNull(final TypedValue value) {
92+
if (value.isNull()) {
93+
throw new FilterException(
94+
"Invalid field '" + config.source() + "' was passed through the connector's configuration'. " +
95+
"Cannot parse null or empty value to XML."
96+
);
97+
}
98+
return value;
99+
}
100+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2021 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.filter;
20+
21+
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
22+
import org.junit.Assert;
23+
import org.junit.Test;
24+
25+
import java.util.Collections;
26+
27+
public class XmlToJsonFilterTest {
28+
29+
String XML = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
30+
"<root>\n" +
31+
" <element1 attr=\"foo\">bar</element1>\n" +
32+
" <element2>value</element2>\n" +
33+
" <element3>value</element3>\n" +
34+
" <element4>value1</element4>\n" +
35+
" <element4>value2</element4>\n" +
36+
" <element5></element5>\n" +
37+
"</root>";
38+
39+
@Test
40+
public void should_success_to_convert_xml_to_json() {
41+
final XmlToJsonFilter filter = new XmlToJsonFilter();
42+
filter.configure(Collections.emptyMap());
43+
final TypedStruct input = TypedStruct.create().put("message", XML);
44+
final TypedStruct output = filter.apply(null, input).iterator().next();
45+
46+
Assert.assertNotNull(output);
47+
Assert.assertEquals(
48+
"{\"root\":{\"element1\":{\"attr\":\"foo\",\"value\":\"bar\"},\"element2\":\"value\",\"element3\":\"value\",\"element4\":[\"value1\",\"value2\"],\"element5\":\"\"}}",
49+
output.getString("message")
50+
);
51+
}
52+
}

0 commit comments

Comments
 (0)