Skip to content

Commit df7c71f

Browse files
Agustin Gonzalezfhussonnois
Agustin Gonzalez
authored andcommitted
fix(filters): fix wrong index for DelimitedRowIndex
1 parent e843cff commit df7c71f

File tree

3 files changed

+27
-1
lines changed

3 files changed

+27
-1
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/StructSchema.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,15 @@ public List<TypedField> fields() {
110110
return ordered;
111111
}
112112

113+
public List<TypedField> fieldsByIndex() {
114+
ArrayList<TypedField> ordered = new ArrayList<>(fields.values());
115+
// order elements in array to match field column index
116+
for (TypedField field: fields.values()) {
117+
ordered.add(field.index(),field);
118+
}
119+
return ordered;
120+
}
121+
113122
void set(final String fieldName, final Schema fieldSchema) {
114123
if (fieldName == null || fieldName.isEmpty()) {
115124
throw new DataException("fieldName cannot be null.");

connect-file-pulse-filters/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/AbstractDelimitedRowFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void configure(final Map<String, ?> configs) {
7575

7676
this.schema = this.configs.schema();
7777
if (schema != null) {
78-
final List<TypedField> fields = schema.fields();
78+
final List<TypedField> fields = schema.fieldsByIndex();
7979
IntStream.range(0, fields.size()).forEach(i -> columnsTypesByIndex.put(i, fields.get(i)));
8080
}
8181
}

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/DelimitedRowFilterTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,23 @@ public void should_use_configured_schema() {
158158
Assert.assertTrue(record.getBoolean("c3"));
159159
}
160160

161+
@Test
162+
public void should_use_configured_schema_when_field_names_out_of_order() {
163+
configs.put(READER_FIELD_COLUMNS_CONFIG, "x1:STRING;c2:INTEGER;y3:BOOLEAN");
164+
filter.configure(configs, alias -> null);
165+
RecordsIterable<TypedStruct> output = filter.apply(null, DEFAULT_STRUCT, false);
166+
Assert.assertNotNull(output);
167+
Assert.assertEquals(1, output.size());
168+
169+
final TypedStruct record = output.iterator().next();
170+
Assert.assertEquals(Type.STRING, record.get("x1").type());
171+
Assert.assertEquals(Type.INTEGER, record.get("c2").type());
172+
Assert.assertEquals(Type.BOOLEAN, record.get("y3").type());
173+
Assert.assertEquals("value1", record.getString("x1"));
174+
Assert.assertEquals(2, record.getInt("c2").intValue());
175+
Assert.assertTrue(record.getBoolean("y3"));
176+
}
177+
161178
@Test
162179
public void should_only_convert_non_empty_values_given_schema() {
163180
// Given

0 commit comments

Comments
 (0)